package ws import ( "context" "encoding/json" "sync" "testing" "time" "github.com/oklog/ulid/v2" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" ) // TestRPCRegistryRoundTrip: register a waiter, resolve it, get the // envelope back. Cover the no-waiter and double-resolve cases too. func TestRPCRegistryRoundTrip(t *testing.T) { t.Parallel() var r rpcRegistry id := ulid.Make().String() ch := r.register(id) want := api.Envelope{Type: api.MsgTreeListResult, ID: id, Payload: json.RawMessage(`{"path":"/"}`)} if !r.resolve(id, want) { t.Fatal("resolve: returned false for registered id") } got := <-ch if got.ID != id { t.Fatalf("id mismatch: got %q want %q", got.ID, id) } // A second resolve for the same id has no waiter and should not panic. if r.resolve(id, want) { t.Fatal("resolve: returned true for already-resolved id") } } // TestRPCRegistryRelease: release abandons the waiter; a subsequent // resolve is a no-op (no goroutine leak, no panic). func TestRPCRegistryRelease(t *testing.T) { t.Parallel() var r rpcRegistry id := ulid.Make().String() _ = r.register(id) r.release(id) if r.resolve(id, api.Envelope{ID: id}) { t.Fatal("resolve after release: should be no-op") } } // TestRPCRegistryConcurrent: many waiters in flight concurrently get // only their own reply. This catches buggy keying/locking. func TestRPCRegistryConcurrent(t *testing.T) { t.Parallel() var r rpcRegistry const n = 64 ids := make([]string, n) chs := make([]chan api.Envelope, n) for i := 0; i < n; i++ { ids[i] = ulid.Make().String() chs[i] = r.register(ids[i]) } // Resolve in random-ish order from many goroutines. var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func(idx int) { defer wg.Done() r.resolve(ids[idx], api.Envelope{ID: ids[idx], Type: api.MsgTreeListResult}) }(i) } wg.Wait() for i := 0; i < n; i++ { select { case got := <-chs[i]: if got.ID != ids[i] { t.Fatalf("waiter %d: got id %q want %q", i, got.ID, ids[i]) } case <-time.After(2 * time.Second): t.Fatalf("waiter %d: timed out", i) } } } // TestSendRPCContextCancelReleases ensures that canceling the caller's // ctx releases the registry entry so a stray late reply is harmlessly // dropped. Skips if the hub isn't reachable for direct access — this // is purely a unit test on the registry path inside SendRPC. func TestSendRPCContextCancelReleases(t *testing.T) { t.Parallel() h := NewHub() // No host registered, so Hub.Send returns "host offline" and // SendRPC bails without ever waiting. We test the timeout/ctx // path by going through register() directly. id := ulid.Make().String() ch := h.rpcs.register(id) ctx, cancel := context.WithCancel(context.Background()) go func() { time.Sleep(20 * time.Millisecond) cancel() }() // Simulate the SendRPC select: ctx wins. select { case <-ch: t.Fatal("unexpected reply") case <-ctx.Done(): h.rpcs.release(id) } // Now a late reply should not block (ch is still open but no // receiver — buffered size 1 absorbs it). resolved := h.rpcs.resolve(id, api.Envelope{ID: id}) if resolved { t.Fatal("resolve after release should return false") } }