ui+server: per-job bandwidth override on Run-now
P2R-13b. POST /hosts/{id}/source-groups/{gid}/run accepts optional
bandwidth_up_kbps / bandwidth_down_kbps form fields, plumbs them onto
CommandRunPayload. Agent dispatcher already prefers per-job override
over host-wide caps (T1). UI wraps the Run-now button in a form with
a <details> 'Limit bandwidth for this run' disclosure containing two
KB/s inputs.
This commit is contained in:
@@ -9,6 +9,7 @@ package http
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
stdhttp "net/http"
|
stdhttp "net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
|
|
||||||
@@ -16,6 +17,34 @@ import (
|
|||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// parseBandwidthOverride pulls optional bandwidth_up_kbps /
|
||||||
|
// bandwidth_down_kbps from the request (form or query). Returns nil
|
||||||
|
// for any field absent or empty; an explicit "0" produces a non-nil
|
||||||
|
// pointer to 0 — i.e., "no cap for this run, even if the host has
|
||||||
|
// one set." Non-integers / negatives are rejected with an error.
|
||||||
|
func parseBandwidthOverride(r *stdhttp.Request) (up *int, down *int, err error) {
|
||||||
|
parse := func(name string) (*int, error) {
|
||||||
|
v := r.FormValue(name)
|
||||||
|
if v == "" {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
n, perr := strconv.Atoi(v)
|
||||||
|
if perr != nil {
|
||||||
|
return nil, errors.New(name + " must be an integer")
|
||||||
|
}
|
||||||
|
if n < 0 {
|
||||||
|
return nil, errors.New(name + " must be >= 0")
|
||||||
|
}
|
||||||
|
return &n, nil
|
||||||
|
}
|
||||||
|
up, err = parse("bandwidth_up_kbps")
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
down, err = parse("bandwidth_down_kbps")
|
||||||
|
return up, down, err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) handleRunSourceGroup(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
func (s *Server) handleRunSourceGroup(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
||||||
user, ok := s.requireUser(r)
|
user, ok := s.requireUser(r)
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -40,13 +69,25 @@ func (s *Server) handleRunSourceGroup(w stdhttp.ResponseWriter, r *stdhttp.Reque
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Optional per-run bandwidth override. Disclosed in the UI under a
|
||||||
|
// <details> "Limit bandwidth for this run" affordance; absent on
|
||||||
|
// the wire (and from JSON callers that don't supply it) means
|
||||||
|
// "fall back to the host's standing caps."
|
||||||
|
upOverride, downOverride, perr := parseBandwidthOverride(r)
|
||||||
|
if perr != nil {
|
||||||
|
s.runGroupError(w, r, stdhttp.StatusBadRequest, "invalid_value", perr.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Backup invocations don't consume RetentionPolicy — that lives on
|
// Backup invocations don't consume RetentionPolicy — that lives on
|
||||||
// forget. Sending the resolved set here would just be dead weight.
|
// forget. Sending the resolved set here would just be dead weight.
|
||||||
res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobBackup,
|
res, status, code, msg := s.dispatchJobWithPayload(r.Context(), user, hostID, api.JobBackup,
|
||||||
api.CommandRunPayload{
|
api.CommandRunPayload{
|
||||||
Includes: g.Includes,
|
Includes: g.Includes,
|
||||||
Excludes: g.Excludes,
|
Excludes: g.Excludes,
|
||||||
Tag: g.Name,
|
Tag: g.Name,
|
||||||
|
BandwidthUpKBps: upOverride,
|
||||||
|
BandwidthDownKBps: downOverride,
|
||||||
})
|
})
|
||||||
if code != "" {
|
if code != "" {
|
||||||
s.runGroupError(w, r, status, code, msg)
|
s.runGroupError(w, r, status, code, msg)
|
||||||
|
|||||||
@@ -0,0 +1,133 @@
|
|||||||
|
// run_group_bandwidth_test.go — covers the per-job bandwidth override
|
||||||
|
// that operators can set via the Run-now form's "Limit bandwidth for
|
||||||
|
// this run" disclosure (P2R-13b).
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
stdhttp "net/http"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coder/websocket"
|
||||||
|
"github.com/oklog/ulid/v2"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestRunSourceGroupBandwidthOverride: connect a fake agent, POST the
|
||||||
|
// per-group Run-now endpoint with bandwidth_up_kbps=512, assert the
|
||||||
|
// dispatched command.run carries it.
|
||||||
|
func TestRunSourceGroupBandwidthOverride(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
srv, ts, st := rawTestServer(t)
|
||||||
|
hostID, token := enrolHostForWS(t, srv, st, "bw-host")
|
||||||
|
|
||||||
|
// Pre-seed an init job so auto-init doesn't fire on hello and
|
||||||
|
// pollute our envelope sequence.
|
||||||
|
if err := st.CreateJob(context.Background(), store.Job{
|
||||||
|
ID: ulid.Make().String(), HostID: hostID, Kind: "init",
|
||||||
|
ActorKind: "system", CreatedAt: time.Now().UTC(),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("seed init: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gid := ulid.Make().String()
|
||||||
|
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
|
||||||
|
ID: gid, HostID: hostID, Name: "etc", Includes: []string{"/etc"},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("group: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := agentDial(t, srv, ts, hostID, token)
|
||||||
|
sendHello(t, c, "bw-host")
|
||||||
|
// Drain on-hello burst before issuing the run-now.
|
||||||
|
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||||
|
|
||||||
|
cookie := loginAsAdmin(t, st)
|
||||||
|
form := url.Values{
|
||||||
|
"bandwidth_up_kbps": {"512"},
|
||||||
|
"bandwidth_down_kbps": {"256"},
|
||||||
|
}
|
||||||
|
req, _ := stdhttp.NewRequest("POST",
|
||||||
|
ts.URL+"/hosts/"+hostID+"/source-groups/"+gid+"/run",
|
||||||
|
strings.NewReader(form.Encode()))
|
||||||
|
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
req.Header.Set("Accept", "application/json")
|
||||||
|
req.AddCookie(cookie)
|
||||||
|
res, err := stdhttp.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("do: %v", err)
|
||||||
|
}
|
||||||
|
res.Body.Close()
|
||||||
|
if res.StatusCode != stdhttp.StatusAccepted {
|
||||||
|
t.Fatalf("status: got %d, want 202", res.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the dispatched command.run; assert overrides are present.
|
||||||
|
deadline := time.Now().Add(3 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
|
||||||
|
mt, raw, rerr := c.Read(ctx)
|
||||||
|
cancel()
|
||||||
|
if rerr != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if mt != websocket.MessageText {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var env api.Envelope
|
||||||
|
_ = json.Unmarshal(raw, &env)
|
||||||
|
if env.Type != api.MsgCommandRun {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var p api.CommandRunPayload
|
||||||
|
if err := env.UnmarshalPayload(&p); err != nil {
|
||||||
|
t.Fatalf("unmarshal: %v", err)
|
||||||
|
}
|
||||||
|
if p.Kind != api.JobBackup {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if p.BandwidthUpKBps == nil || *p.BandwidthUpKBps != 512 {
|
||||||
|
t.Fatalf("BandwidthUpKBps: got %v, want 512", p.BandwidthUpKBps)
|
||||||
|
}
|
||||||
|
if p.BandwidthDownKBps == nil || *p.BandwidthDownKBps != 256 {
|
||||||
|
t.Fatalf("BandwidthDownKBps: got %v, want 256", p.BandwidthDownKBps)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Fatal("timed out waiting for command.run with bandwidth override")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRunSourceGroupBandwidthRejectsNegative: invalid value → 400.
|
||||||
|
func TestRunSourceGroupBandwidthRejectsNegative(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
_, url2, st := newTestServerWithHub(t)
|
||||||
|
cookie := loginAsAdmin(t, st)
|
||||||
|
hostID := makeHost(t, st, "bw-rej-host")
|
||||||
|
gid := ulid.Make().String()
|
||||||
|
if err := st.CreateSourceGroup(context.Background(), &store.SourceGroup{
|
||||||
|
ID: gid, HostID: hostID, Name: "etc", Includes: []string{"/etc"},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("group: %v", err)
|
||||||
|
}
|
||||||
|
form := url.Values{"bandwidth_up_kbps": {"-1"}}
|
||||||
|
req, _ := stdhttp.NewRequest("POST",
|
||||||
|
url2+"/hosts/"+hostID+"/source-groups/"+gid+"/run",
|
||||||
|
strings.NewReader(form.Encode()))
|
||||||
|
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
req.Header.Set("Accept", "application/json")
|
||||||
|
req.AddCookie(cookie)
|
||||||
|
res, err := stdhttp.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("do: %v", err)
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != stdhttp.StatusBadRequest {
|
||||||
|
t.Fatalf("status: got %d, want 400", res.StatusCode)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -53,12 +53,27 @@
|
|||||||
{{if gt $row.SnapshotCount 0}} · <span class="mono">{{$row.SnapshotCount}}</span> snapshot{{if ne $row.SnapshotCount 1}}s{{end}}{{end}}
|
{{if gt $row.SnapshotCount 0}} · <span class="mono">{{$row.SnapshotCount}}</span> snapshot{{if ne $row.SnapshotCount 1}}s{{end}}{{end}}
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<div class="flex justify-end row-action" style="gap: 6px;">
|
<div class="flex flex-col items-end row-action" style="gap: 6px;">
|
||||||
{{if and (gt (len $g.Includes) 0) (eq $host.Status "online")}}
|
{{if and (gt (len $g.Includes) 0) (eq $host.Status "online")}}
|
||||||
<button class="btn btn-primary"
|
<form id="run-{{$g.ID}}" class="flex flex-col items-end" style="gap: 4px;">
|
||||||
hx-post="/hosts/{{$host.ID}}/source-groups/{{$g.ID}}/run"
|
<button class="btn btn-primary"
|
||||||
hx-swap="none"
|
hx-post="/hosts/{{$host.ID}}/source-groups/{{$g.ID}}/run"
|
||||||
hx-disabled-elt="this">Run now</button>
|
hx-include="#run-{{$g.ID}}"
|
||||||
|
hx-swap="none"
|
||||||
|
hx-disabled-elt="this">Run now</button>
|
||||||
|
<details class="text-[11px] text-ink-fade" style="text-align: right;">
|
||||||
|
<summary class="cursor-pointer hover:text-ink-mid select-none">Limit bandwidth for this run</summary>
|
||||||
|
<div class="flex items-center mt-2" style="gap: 6px; font-family: var(--font-mono);">
|
||||||
|
<label class="text-[10.5px] text-ink-mute">↑</label>
|
||||||
|
<input type="number" min="0" name="bandwidth_up_kbps" placeholder="—" class="input mono"
|
||||||
|
style="width: 70px; height: 22px; padding: 0 6px; font-size: 11px;">
|
||||||
|
<label class="text-[10.5px] text-ink-mute">↓</label>
|
||||||
|
<input type="number" min="0" name="bandwidth_down_kbps" placeholder="—" class="input mono"
|
||||||
|
style="width: 70px; height: 22px; padding: 0 6px; font-size: 11px;">
|
||||||
|
<span class="text-[10.5px] text-ink-fade">KB/s</span>
|
||||||
|
</div>
|
||||||
|
</details>
|
||||||
|
</form>
|
||||||
{{else}}
|
{{else}}
|
||||||
<button class="btn" disabled
|
<button class="btn" disabled
|
||||||
title="{{if eq (len $g.Includes) 0}}add at least one include path before running{{else}}host is offline{{end}}">Run now</button>
|
title="{{if eq (len $g.Includes) 0}}add at least one include path before running{{else}}host is offline{{end}}">Run now</button>
|
||||||
|
|||||||
Reference in New Issue
Block a user