P3-X2: tree.list synchronous WS RPC + per-session cache
Foundational for the restore wizard's tree browser. The wizard needs to lazy-load directory contents from a snapshot as the operator drills down; this lands the transport. - internal/api adds MsgTreeList (server → agent) + MsgTreeListResult (agent → server) with TreeListRequestPayload / TreeListEntry / TreeListResultPayload types. Reply correlates by Envelope.ID. - internal/restic.ListTreeChildren wraps 'restic ls --json' and filters its recursive output to direct children of the requested path. Parser + path-normalisation + isDirectChild are unit-tested. - internal/server/ws/rpc.go introduces a generic SendRPC helper on Hub: register a buffered channel keyed by ULID, send the request, block on ctx.Done()/timeout/reply. Reply routing piggybacks on the existing dispatchAgentMessage by adding a MsgTreeListResult case that forwards to the registered waiter; if no waiter is registered (caller already gave up) the stray reply is dropped quietly. - cmd/agent gains a tree.list handler that runs ListTreeChildren on a fresh per-call context (60s ceiling) and ships the matching tree.list.result envelope. Errors surface in result.Error rather than as transport failures so the server-side waiter can render a sensible UI message. - internal/server/http/tree_cache.go is the per-wizard-session cache layer (~30min TTL, sweep-on-access) that fetchTreeWithCache uses before falling through to SendRPC. Cached on success only; agent errors aren't cached so a transient failure doesn't poison the session. Tests: - internal/restic/ls_test.go covers parseLsChildren at root / mid-tree / leaf, plus normalizeTreePath and isDirectChild edge cases. - internal/server/ws/rpc_test.go unit-tests the registry: round-trip, release semantics, concurrent waiters, ctx-cancel. - internal/server/http/tree_rpc_test.go is the full round-trip: server SendRPC → fake-agent over a real WS → reply → server gets the payload. Plus a timeout test that confirms ~300ms timeouts terminate in ~300ms rather than waiting forever. The cache is plumbed but no UI handler hits fetchTreeWithCache yet — that lands with P3-01 (wizard backend). The unused-linter is suppressed via nolint until the wizard wires it in.
This commit is contained in:
@@ -0,0 +1,146 @@
|
||||
// tree_rpc_test.go — full round-trip test for the tree.list synchronous
|
||||
// RPC (P3-X2). A fake agent reads the inbound tree.list, replies with a
|
||||
// canned tree.list.result, and we assert the server's SendRPC returned
|
||||
// the expected payload.
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coder/websocket"
|
||||
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||
)
|
||||
|
||||
func TestSendRPCTreeListRoundTrip(t *testing.T) {
|
||||
t.Parallel()
|
||||
srv, ts, st := rawTestServer(t)
|
||||
hostID, token := enrolHostForWS(t, srv, st, "rpc-host")
|
||||
c := agentDial(t, srv, ts, hostID, token)
|
||||
sendHello(t, c, "rpc-host")
|
||||
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||
|
||||
// Fake agent: read inbound envelopes, mirror tree.list with a
|
||||
// canned result. Other inbound envelopes (config.update etc) are
|
||||
// already drained above.
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
for {
|
||||
mt, raw, err := c.Read(ctx)
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
if mt != websocket.MessageText {
|
||||
continue
|
||||
}
|
||||
var env api.Envelope
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
if env.Type != api.MsgTreeList {
|
||||
continue
|
||||
}
|
||||
var req api.TreeListRequestPayload
|
||||
if err := env.UnmarshalPayload(&req); err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
result := api.TreeListResultPayload{
|
||||
SnapshotID: req.SnapshotID,
|
||||
Path: req.Path,
|
||||
Entries: []api.TreeListEntry{
|
||||
{Name: "etc", Type: "dir"},
|
||||
{Name: "var", Type: "dir"},
|
||||
},
|
||||
}
|
||||
out, err := api.Marshal(api.MsgTreeListResult, env.ID, result)
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
rawOut, _ := json.Marshal(out)
|
||||
if err := c.Write(ctx, websocket.MessageText, rawOut); err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
done <- nil
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// Server-side SendRPC.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
|
||||
defer cancel()
|
||||
reply, err := srv.deps.Hub.SendRPC(ctx, hostID, api.MsgTreeList,
|
||||
api.TreeListRequestPayload{SnapshotID: "f3a7b2c1", Path: "/"},
|
||||
3*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("SendRPC: %v", err)
|
||||
}
|
||||
if reply.Type != api.MsgTreeListResult {
|
||||
t.Fatalf("reply type: got %q want %q", reply.Type, api.MsgTreeListResult)
|
||||
}
|
||||
var result api.TreeListResultPayload
|
||||
if err := reply.UnmarshalPayload(&result); err != nil {
|
||||
t.Fatalf("unmarshal reply: %v", err)
|
||||
}
|
||||
if result.SnapshotID != "f3a7b2c1" || result.Path != "/" {
|
||||
t.Fatalf("payload: got %+v", result)
|
||||
}
|
||||
if len(result.Entries) != 2 || result.Entries[0].Name != "etc" {
|
||||
t.Fatalf("entries: %+v", result.Entries)
|
||||
}
|
||||
|
||||
// Make sure the fake agent didn't error out.
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
t.Fatalf("fake agent: %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("fake agent didn't finish")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSendRPCTimeoutNoReply: SendRPC times out cleanly when the agent
|
||||
// never replies; the registry entry is released so a stray late reply
|
||||
// wouldn't deadlock anything.
|
||||
func TestSendRPCTimeoutNoReply(t *testing.T) {
|
||||
t.Parallel()
|
||||
srv, ts, st := rawTestServer(t)
|
||||
hostID, token := enrolHostForWS(t, srv, st, "rpc-timeout-host")
|
||||
c := agentDial(t, srv, ts, hostID, token)
|
||||
sendHello(t, c, "rpc-timeout-host")
|
||||
_ = drainUntil(t, c, api.MsgScheduleSet)
|
||||
|
||||
// Fake agent reads but never replies.
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
for {
|
||||
if _, _, err := c.Read(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
t0 := time.Now()
|
||||
_, err := srv.deps.Hub.SendRPC(ctx, hostID, api.MsgTreeList,
|
||||
api.TreeListRequestPayload{SnapshotID: "x", Path: "/"},
|
||||
300*time.Millisecond)
|
||||
if err == nil {
|
||||
t.Fatal("expected timeout error")
|
||||
}
|
||||
elapsed := time.Since(t0)
|
||||
if elapsed < 250*time.Millisecond || elapsed > 2*time.Second {
|
||||
t.Fatalf("timeout took %s, expected ~300ms", elapsed)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user