P3-X2: tree.list synchronous WS RPC + per-session cache

Foundational for the restore wizard's tree browser. The wizard needs to
lazy-load directory contents from a snapshot as the operator drills
down; this lands the transport.

- internal/api adds MsgTreeList (server → agent) + MsgTreeListResult
  (agent → server) with TreeListRequestPayload / TreeListEntry /
  TreeListResultPayload types. Reply correlates by Envelope.ID.
- internal/restic.ListTreeChildren wraps 'restic ls --json' and
  filters its recursive output to direct children of the requested
  path. Parser + path-normalisation + isDirectChild are unit-tested.
- internal/server/ws/rpc.go introduces a generic SendRPC helper on
  Hub: register a buffered channel keyed by ULID, send the request,
  block on ctx.Done()/timeout/reply. Reply routing piggybacks on the
  existing dispatchAgentMessage by adding a MsgTreeListResult case
  that forwards to the registered waiter; if no waiter is registered
  (caller already gave up) the stray reply is dropped quietly.
- cmd/agent gains a tree.list handler that runs ListTreeChildren on a
  fresh per-call context (60s ceiling) and ships the matching
  tree.list.result envelope. Errors surface in result.Error rather
  than as transport failures so the server-side waiter can render a
  sensible UI message.
- internal/server/http/tree_cache.go is the per-wizard-session cache
  layer (~30min TTL, sweep-on-access) that fetchTreeWithCache uses
  before falling through to SendRPC. Cached on success only; agent
  errors aren't cached so a transient failure doesn't poison the
  session.

Tests:
- internal/restic/ls_test.go covers parseLsChildren at root / mid-tree
  / leaf, plus normalizeTreePath and isDirectChild edge cases.
- internal/server/ws/rpc_test.go unit-tests the registry: round-trip,
  release semantics, concurrent waiters, ctx-cancel.
- internal/server/http/tree_rpc_test.go is the full round-trip: server
  SendRPC → fake-agent over a real WS → reply → server gets the
  payload. Plus a timeout test that confirms ~300ms timeouts terminate
  in ~300ms rather than waiting forever.

The cache is plumbed but no UI handler hits fetchTreeWithCache yet —
that lands with P3-01 (wizard backend). The unused-linter is suppressed
via nolint until the wizard wires it in.
This commit is contained in:
2026-05-04 15:19:22 +01:00
parent 94149a7324
commit 13f58bd052
12 changed files with 905 additions and 12 deletions
+6
View File
@@ -58,6 +58,11 @@ type Server struct {
// pending_id so the accept/reject handlers can push the bearer
// or close cleanly (P2-18b).
pendingHub *pendingHub
// treeCache holds per-wizard-session listings of snapshot
// directories (P3-X2). Pre-allocated in New so the lazy-init
// race is impossible.
treeCache *treeCache
}
// New builds a configured but not-yet-started server.
@@ -81,6 +86,7 @@ func New(deps Deps) *Server {
drainLocks: make(map[string]*sync.Mutex),
announceRL: newAnnounceLimiter(),
pendingHub: newPendingHub(),
treeCache: newTreeCache(),
}
s.routes(r)
+112
View File
@@ -0,0 +1,112 @@
package http
import (
"context"
"sync"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
)
// treeCacheTTL is how long a per-session cached directory listing
// stays valid. The whole point of the cache is to make re-expanding
// nodes within the same wizard session snappy; 30 minutes covers a
// generous wizard interaction window without holding stale data
// indefinitely.
const treeCacheTTL = 30 * time.Minute
// treeCacheKey identifies one cached listing. session_id scopes
// entries to a single browser session so two operators don't share
// view state; snapshot_id + path identify the directory inside the
// snapshot.
type treeCacheKey struct {
SessionID string
HostID string
SnapshotID string
Path string
}
type treeCacheEntry struct {
Result api.TreeListResultPayload
ExpiresAt time.Time
}
// treeCache is a per-process map of synchronously fetched directory
// listings. Concurrency is light (a few entries per active wizard
// session) so a single mutex is fine.
type treeCache struct {
mu sync.Mutex
entries map[treeCacheKey]treeCacheEntry
}
func newTreeCache() *treeCache {
return &treeCache{entries: make(map[treeCacheKey]treeCacheEntry)}
}
// Get returns a cached entry if one exists and hasn't expired.
func (c *treeCache) Get(k treeCacheKey, now time.Time) (api.TreeListResultPayload, bool) {
c.mu.Lock()
defer c.mu.Unlock()
e, ok := c.entries[k]
if !ok {
return api.TreeListResultPayload{}, false
}
if now.After(e.ExpiresAt) {
delete(c.entries, k)
return api.TreeListResultPayload{}, false
}
return e.Result, true
}
// Put records a fresh listing under k. Caller is responsible for
// having validated the result first (Error == "").
func (c *treeCache) Put(k treeCacheKey, result api.TreeListResultPayload, now time.Time) {
c.mu.Lock()
c.entries[k] = treeCacheEntry{
Result: result,
ExpiresAt: now.Add(treeCacheTTL),
}
c.mu.Unlock()
}
// Sweep deletes expired entries. Called opportunistically from the
// wizard handler — no separate goroutine needed; cache size is small.
func (c *treeCache) Sweep(now time.Time) {
c.mu.Lock()
for k, e := range c.entries {
if now.After(e.ExpiresAt) {
delete(c.entries, k)
}
}
c.mu.Unlock()
}
// fetchTreeWithCache returns a directory listing — cache hit, or a
// synchronous tree.list RPC against the agent on miss. On agent error
// (not transport error), the result is returned as-is with Error set
// rather than cached, so a transient failure doesn't poison subsequent
// requests for the same path.
//
//nolint:unused // wired in by the wizard handler in the next slice
func (s *Server) fetchTreeWithCache(ctx context.Context, sessionID, hostID, snapshotID, path string) (api.TreeListResultPayload, error) {
now := time.Now()
k := treeCacheKey{SessionID: sessionID, HostID: hostID, SnapshotID: snapshotID, Path: path}
if cached, ok := s.treeCache.Get(k, now); ok {
return cached, nil
}
reply, err := s.deps.Hub.SendRPC(ctx, hostID, api.MsgTreeList,
api.TreeListRequestPayload{SnapshotID: snapshotID, Path: path},
30*time.Second)
if err != nil {
return api.TreeListResultPayload{}, err
}
var result api.TreeListResultPayload
if perr := reply.UnmarshalPayload(&result); perr != nil {
return api.TreeListResultPayload{}, perr
}
if result.Error == "" {
s.treeCache.Put(k, result, now)
}
return result, nil
}
+146
View File
@@ -0,0 +1,146 @@
// tree_rpc_test.go — full round-trip test for the tree.list synchronous
// RPC (P3-X2). A fake agent reads the inbound tree.list, replies with a
// canned tree.list.result, and we assert the server's SendRPC returned
// the expected payload.
package http
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/coder/websocket"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
)
func TestSendRPCTreeListRoundTrip(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "rpc-host")
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "rpc-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
// Fake agent: read inbound envelopes, mirror tree.list with a
// canned result. Other inbound envelopes (config.update etc) are
// already drained above.
done := make(chan error, 1)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for {
mt, raw, err := c.Read(ctx)
if err != nil {
done <- err
return
}
if mt != websocket.MessageText {
continue
}
var env api.Envelope
if err := json.Unmarshal(raw, &env); err != nil {
done <- err
return
}
if env.Type != api.MsgTreeList {
continue
}
var req api.TreeListRequestPayload
if err := env.UnmarshalPayload(&req); err != nil {
done <- err
return
}
result := api.TreeListResultPayload{
SnapshotID: req.SnapshotID,
Path: req.Path,
Entries: []api.TreeListEntry{
{Name: "etc", Type: "dir"},
{Name: "var", Type: "dir"},
},
}
out, err := api.Marshal(api.MsgTreeListResult, env.ID, result)
if err != nil {
done <- err
return
}
rawOut, _ := json.Marshal(out)
if err := c.Write(ctx, websocket.MessageText, rawOut); err != nil {
done <- err
return
}
done <- nil
return
}
}()
// Server-side SendRPC.
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
reply, err := srv.deps.Hub.SendRPC(ctx, hostID, api.MsgTreeList,
api.TreeListRequestPayload{SnapshotID: "f3a7b2c1", Path: "/"},
3*time.Second)
if err != nil {
t.Fatalf("SendRPC: %v", err)
}
if reply.Type != api.MsgTreeListResult {
t.Fatalf("reply type: got %q want %q", reply.Type, api.MsgTreeListResult)
}
var result api.TreeListResultPayload
if err := reply.UnmarshalPayload(&result); err != nil {
t.Fatalf("unmarshal reply: %v", err)
}
if result.SnapshotID != "f3a7b2c1" || result.Path != "/" {
t.Fatalf("payload: got %+v", result)
}
if len(result.Entries) != 2 || result.Entries[0].Name != "etc" {
t.Fatalf("entries: %+v", result.Entries)
}
// Make sure the fake agent didn't error out.
select {
case err := <-done:
if err != nil {
t.Fatalf("fake agent: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("fake agent didn't finish")
}
}
// TestSendRPCTimeoutNoReply: SendRPC times out cleanly when the agent
// never replies; the registry entry is released so a stray late reply
// wouldn't deadlock anything.
func TestSendRPCTimeoutNoReply(t *testing.T) {
t.Parallel()
srv, ts, st := rawTestServer(t)
hostID, token := enrolHostForWS(t, srv, st, "rpc-timeout-host")
c := agentDial(t, srv, ts, hostID, token)
sendHello(t, c, "rpc-timeout-host")
_ = drainUntil(t, c, api.MsgScheduleSet)
// Fake agent reads but never replies.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for {
if _, _, err := c.Read(ctx); err != nil {
return
}
}
}()
ctx := context.Background()
t0 := time.Now()
_, err := srv.deps.Hub.SendRPC(ctx, hostID, api.MsgTreeList,
api.TreeListRequestPayload{SnapshotID: "x", Path: "/"},
300*time.Millisecond)
if err == nil {
t.Fatal("expected timeout error")
}
elapsed := time.Since(t0)
if elapsed < 250*time.Millisecond || elapsed > 2*time.Second {
t.Fatalf("timeout took %s, expected ~300ms", elapsed)
}
}
+14
View File
@@ -297,6 +297,20 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E
// (job.started → job.finished) is sufficient signal.
slog.Debug("ws msg not yet handled", "type", env.Type, "host_id", hostID)
case api.MsgTreeListResult:
// Reply to a synchronous tree.list RPC. Route to the waiter
// registered against the request envelope's ID; if none is
// registered the caller already gave up (ctx expired) — drop
// the stray reply quietly.
if env.ID == "" {
slog.Warn("ws: tree.list.result missing envelope ID", "host_id", hostID)
break
}
if !deps.Hub.rpcs.resolve(env.ID, env) {
slog.Debug("ws: tree.list.result with no waiter (timeout?)",
"id", env.ID, "host_id", hostID)
}
case api.MsgError:
var ep api.ErrorPayload
_ = env.UnmarshalPayload(&ep)
+5
View File
@@ -21,6 +21,11 @@ import (
type Hub struct {
mu sync.RWMutex
conns map[string]*Conn // hostID → conn
// rpcs tracks in-flight synchronous RPC calls (e.g. tree.list).
// See rpc.go for details. Lazy-initialized via the registry's
// own register() so callers don't have to juggle a constructor.
rpcs rpcRegistry
}
// NewHub returns an empty hub.
+112
View File
@@ -0,0 +1,112 @@
package ws
import (
"context"
"errors"
"sync"
"time"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
)
// rpcRegistry holds in-flight synchronous RPC calls. SendRPC registers
// a channel keyed by the request envelope's ID; the WS read loop's
// dispatcher routes incoming reply envelopes to the matching channel
// when their type is one of the known reply types (currently just
// tree.list.result).
//
// A single global registry keyed by envelope ID is fine because IDs
// are ULIDs — globally unique without coordinating across hubs.
type rpcRegistry struct {
mu sync.Mutex
pending map[string]chan api.Envelope
}
// register reserves a channel for the given request ID. The channel
// is buffered (cap 1) so a slow waiter doesn't block the read loop's
// dispatcher when the reply lands.
func (r *rpcRegistry) register(id string) chan api.Envelope {
ch := make(chan api.Envelope, 1)
r.mu.Lock()
if r.pending == nil {
r.pending = make(map[string]chan api.Envelope)
}
r.pending[id] = ch
r.mu.Unlock()
return ch
}
// resolve delivers an envelope to its waiter and removes the entry.
// Returns whether a waiter was actually present (the dispatcher uses
// this to decide whether to log a stray-reply warning).
func (r *rpcRegistry) resolve(id string, env api.Envelope) bool {
r.mu.Lock()
ch, ok := r.pending[id]
if ok {
delete(r.pending, id)
}
r.mu.Unlock()
if !ok {
return false
}
// Buffered chan cap 1 — non-blocking send. The waiter goroutine
// owns the receive side so this is the only sender.
ch <- env
close(ch)
return true
}
// release abandons the entry without delivering a value. Used when
// the caller's context expires before a reply arrives — the next
// stray reply (if any) will hit the no-waiter case in resolve and
// just be dropped.
func (r *rpcRegistry) release(id string) {
r.mu.Lock()
delete(r.pending, id)
r.mu.Unlock()
}
// SendRPC sends a request envelope to the host and blocks until a
// matching reply lands or the context expires. The hub picks a fresh
// envelope ID, marshals the payload, registers a waiter, and sends.
//
// timeout caps the wait; a too-aggressive value relative to the
// expected restic-side latency will leak the registry entry until the
// reply finally arrives (which is then silently dropped). The default
// callers use is 30s, which covers a slow network round-trip plus a
// restic ls invocation against a remote rest-server.
//
// If the host disconnects mid-flight, the read loop ends and no reply
// will ever come — the caller's ctx.Done()/timeout is the only path
// out. We could pre-fail by tracking conn lifetime, but the bound
// keeps the code simple and the worst case is a 30s wait.
func (h *Hub) SendRPC(ctx context.Context, hostID string, reqType api.MessageType, payload any, timeout time.Duration) (api.Envelope, error) {
if timeout <= 0 {
timeout = 30 * time.Second
}
id := ulid.Make().String()
env, err := api.Marshal(reqType, id, payload)
if err != nil {
return api.Envelope{}, err
}
ch := h.rpcs.register(id)
if err := h.Send(ctx, hostID, env); err != nil {
h.rpcs.release(id)
return api.Envelope{}, err
}
select {
case reply := <-ch:
return reply, nil
case <-ctx.Done():
h.rpcs.release(id)
return api.Envelope{}, ctx.Err()
case <-time.After(timeout):
h.rpcs.release(id)
return api.Envelope{}, errors.New("ws rpc: timed out waiting for reply")
}
}
+122
View File
@@ -0,0 +1,122 @@
package ws
import (
"context"
"encoding/json"
"sync"
"testing"
"time"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
)
// TestRPCRegistryRoundTrip: register a waiter, resolve it, get the
// envelope back. Cover the no-waiter and double-resolve cases too.
func TestRPCRegistryRoundTrip(t *testing.T) {
t.Parallel()
var r rpcRegistry
id := ulid.Make().String()
ch := r.register(id)
want := api.Envelope{Type: api.MsgTreeListResult, ID: id, Payload: json.RawMessage(`{"path":"/"}`)}
if !r.resolve(id, want) {
t.Fatal("resolve: returned false for registered id")
}
got := <-ch
if got.ID != id {
t.Fatalf("id mismatch: got %q want %q", got.ID, id)
}
// A second resolve for the same id has no waiter and should not panic.
if r.resolve(id, want) {
t.Fatal("resolve: returned true for already-resolved id")
}
}
// TestRPCRegistryRelease: release abandons the waiter; a subsequent
// resolve is a no-op (no goroutine leak, no panic).
func TestRPCRegistryRelease(t *testing.T) {
t.Parallel()
var r rpcRegistry
id := ulid.Make().String()
_ = r.register(id)
r.release(id)
if r.resolve(id, api.Envelope{ID: id}) {
t.Fatal("resolve after release: should be no-op")
}
}
// TestRPCRegistryConcurrent: many waiters in flight concurrently get
// only their own reply. This catches buggy keying/locking.
func TestRPCRegistryConcurrent(t *testing.T) {
t.Parallel()
var r rpcRegistry
const n = 64
ids := make([]string, n)
chs := make([]chan api.Envelope, n)
for i := 0; i < n; i++ {
ids[i] = ulid.Make().String()
chs[i] = r.register(ids[i])
}
// Resolve in random-ish order from many goroutines.
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
r.resolve(ids[idx], api.Envelope{ID: ids[idx], Type: api.MsgTreeListResult})
}(i)
}
wg.Wait()
for i := 0; i < n; i++ {
select {
case got := <-chs[i]:
if got.ID != ids[i] {
t.Fatalf("waiter %d: got id %q want %q", i, got.ID, ids[i])
}
case <-time.After(2 * time.Second):
t.Fatalf("waiter %d: timed out", i)
}
}
}
// TestSendRPCContextCancelReleases ensures that canceling the caller's
// ctx releases the registry entry so a stray late reply is harmlessly
// dropped. Skips if the hub isn't reachable for direct access — this
// is purely a unit test on the registry path inside SendRPC.
func TestSendRPCContextCancelReleases(t *testing.T) {
t.Parallel()
h := NewHub()
// No host registered, so Hub.Send returns "host offline" and
// SendRPC bails without ever waiting. We test the timeout/ctx
// path by going through register() directly.
id := ulid.Make().String()
ch := h.rpcs.register(id)
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(20 * time.Millisecond)
cancel()
}()
// Simulate the SendRPC select: ctx wins.
select {
case <-ch:
t.Fatal("unexpected reply")
case <-ctx.Done():
h.rpcs.release(id)
}
// Now a late reply should not block (ch is still open but no
// receiver — buffered size 1 absorbs it).
resolved := h.rpcs.resolve(id, api.Envelope{ID: id})
if resolved {
t.Fatal("resolve after release should return false")
}
}