server: P2-18b pending WS + admin accept/reject
GET /ws/agent/pending?pending_id=… runs an Ed25519 nonce-sign
handshake against the row's stored public key, then holds the
connection open. POST /api/pending-hosts/{id}/accept (admin)
mints a real Host row + bearer + AEAD-encrypted repo creds, pushes
the bearer down the open WS, deletes the pending row, and writes
a host.accept_pending audit entry. POST /api/pending-hosts/{id}/reject
closes the socket with code 4001 and audit-logs host.reject_pending.
In-memory pendingHub keyed by pending_id wires accept/reject to
their live socket.
This commit is contained in:
@@ -0,0 +1,349 @@
|
||||
// pending_ws.go — /ws/agent/pending and the admin accept/reject
|
||||
// endpoints for the announce-and-approve enrolment flow (P2-18b).
|
||||
//
|
||||
// Flow:
|
||||
// 1. Agent has previously called POST /api/agents/announce, which
|
||||
// returned its pending_id + fingerprint. Agent persists the
|
||||
// keypair locally.
|
||||
// 2. Agent connects to /ws/agent/pending?pending_id=… (no auth).
|
||||
// Server reads the row, generates a 32-byte nonce, sends it.
|
||||
// 3. Agent signs the nonce with its Ed25519 private key, sends the
|
||||
// signature back. Server verifies; close on bad sig.
|
||||
// 4. The connection sits open; the agent reads but doesn't write.
|
||||
// 5. Admin clicks Accept: POST /api/pending-hosts/{id}/accept with
|
||||
// the same repo-creds form the token-mint flow uses. Server
|
||||
// mints a Host row + bearer + encrypted creds, pushes one
|
||||
// `enrolled` message down the open socket, closes cleanly.
|
||||
// 6. Admin clicks Reject: socket closes with code 4001.
|
||||
//
|
||||
// Hub: a process-local in-memory map of pending_id → live conn so
|
||||
// the accept/reject handlers can find the right socket. Sole
|
||||
// instance lives on Server.pendingHub.
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
stdhttp "net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coder/websocket"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/oklog/ulid/v2"
|
||||
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/auth"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||
)
|
||||
|
||||
// pendingConn is a single live /ws/agent/pending session. The accept
|
||||
// handler sends the enrolment message via Send and closes the socket;
|
||||
// the WS read loop is just waiting for that close.
|
||||
type pendingConn struct {
|
||||
conn *websocket.Conn
|
||||
pendingID string
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
// pendingHub is the in-memory map of pending_id → live socket.
|
||||
type pendingHub struct {
|
||||
mu sync.Mutex
|
||||
conns map[string]*pendingConn
|
||||
}
|
||||
|
||||
func newPendingHub() *pendingHub {
|
||||
return &pendingHub{conns: map[string]*pendingConn{}}
|
||||
}
|
||||
|
||||
func (h *pendingHub) register(pc *pendingConn) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
// Replace any existing socket for the same pending_id (an agent
|
||||
// reconnected) — close the old one cleanly first so its goroutine
|
||||
// can exit.
|
||||
if old, ok := h.conns[pc.pendingID]; ok {
|
||||
_ = old.conn.Close(websocket.StatusNormalClosure, "superseded")
|
||||
close(old.closed)
|
||||
}
|
||||
h.conns[pc.pendingID] = pc
|
||||
}
|
||||
|
||||
func (h *pendingHub) unregister(pendingID string, pc *pendingConn) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if cur, ok := h.conns[pendingID]; ok && cur == pc {
|
||||
delete(h.conns, pendingID)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *pendingHub) get(pendingID string) *pendingConn {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.conns[pendingID]
|
||||
}
|
||||
|
||||
// nonceMessage is what the server sends first on /ws/agent/pending.
|
||||
type nonceMessage struct {
|
||||
Type string `json:"type"` // "nonce"
|
||||
Nonce string `json:"nonce"` // base64
|
||||
}
|
||||
|
||||
// signedNonceMessage is what the agent sends back.
|
||||
type signedNonceMessage struct {
|
||||
Type string `json:"type"` // "signed_nonce"
|
||||
Signature string `json:"signature"` // base64
|
||||
}
|
||||
|
||||
// enrolledMessage is what the server sends on accept. The agent
|
||||
// persists the bearer to agent.yaml and exits announce mode.
|
||||
type enrolledMessage struct {
|
||||
Type string `json:"type"` // "enrolled"
|
||||
HostID string `json:"host_id"`
|
||||
Bearer string `json:"bearer"`
|
||||
ServerID string `json:"server_id,omitempty"`
|
||||
}
|
||||
|
||||
// handlePendingWS upgrades the WS, runs the nonce-sign handshake,
|
||||
// registers the conn in the hub, and blocks until the conn is
|
||||
// closed (by accept/reject or by the agent disconnecting).
|
||||
func (s *Server) handlePendingWS(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
||||
pendingID := r.URL.Query().Get("pending_id")
|
||||
if pendingID == "" {
|
||||
stdhttp.Error(w, "missing pending_id", stdhttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
row, err := s.deps.Store.GetPendingHost(r.Context(), pendingID)
|
||||
if err != nil {
|
||||
stdhttp.Error(w, "pending host not found", stdhttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if time.Now().UTC().After(row.ExpiresAt) {
|
||||
stdhttp.Error(w, "pending host expired", stdhttp.StatusGone)
|
||||
return
|
||||
}
|
||||
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||||
// Same-origin defaults are safe: the agent isn't a browser.
|
||||
InsecureSkipVerify: true,
|
||||
})
|
||||
if err != nil {
|
||||
slog.Warn("pending ws: accept", "pending_id", pendingID, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Generate + send nonce.
|
||||
nonce := make([]byte, 32)
|
||||
if _, err := rand.Read(nonce); err != nil {
|
||||
_ = conn.Close(websocket.StatusInternalError, "nonce gen")
|
||||
return
|
||||
}
|
||||
nm := nonceMessage{Type: "nonce", Nonce: base64.StdEncoding.EncodeToString(nonce)}
|
||||
raw, _ := json.Marshal(nm)
|
||||
wctx, wcancel := context.WithTimeout(r.Context(), 5*time.Second)
|
||||
if err := conn.Write(wctx, websocket.MessageText, raw); err != nil {
|
||||
wcancel()
|
||||
_ = conn.Close(websocket.StatusInternalError, "send nonce")
|
||||
return
|
||||
}
|
||||
wcancel()
|
||||
|
||||
// Read signed nonce back.
|
||||
rctx, rcancel := context.WithTimeout(r.Context(), 30*time.Second)
|
||||
mt, body, err := conn.Read(rctx)
|
||||
rcancel()
|
||||
if err != nil || mt != websocket.MessageText {
|
||||
_ = conn.Close(websocket.StatusPolicyViolation, "no signed nonce")
|
||||
return
|
||||
}
|
||||
var sig signedNonceMessage
|
||||
if err := json.Unmarshal(body, &sig); err != nil || sig.Type != "signed_nonce" {
|
||||
_ = conn.Close(websocket.StatusPolicyViolation, "bad signed nonce shape")
|
||||
return
|
||||
}
|
||||
sigBytes, err := base64.StdEncoding.DecodeString(sig.Signature)
|
||||
if err != nil {
|
||||
_ = conn.Close(websocket.StatusPolicyViolation, "bad signature b64")
|
||||
return
|
||||
}
|
||||
if !ed25519.Verify(row.PublicKey, nonce, sigBytes) {
|
||||
_ = conn.Close(websocket.StatusPolicyViolation, "signature does not verify")
|
||||
return
|
||||
}
|
||||
|
||||
// Touch the row so the dashboard knows the agent is live.
|
||||
_ = s.deps.Store.TouchPendingHost(context.Background(), pendingID, time.Now().UTC())
|
||||
|
||||
// Register and block until close.
|
||||
pc := &pendingConn{conn: conn, pendingID: pendingID, closed: make(chan struct{})}
|
||||
s.pendingHub.register(pc)
|
||||
defer s.pendingHub.unregister(pendingID, pc)
|
||||
|
||||
// Read loop: we don't expect any further frames from the agent.
|
||||
// If the agent closes, we exit.
|
||||
go func() {
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
|
||||
_, _, err := conn.Read(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
close(pc.closed)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
<-pc.closed
|
||||
}
|
||||
|
||||
// acceptForm is the admin form for POST /api/pending-hosts/{id}/accept.
|
||||
// repo_password may be omitted only when the host already has admin-
|
||||
// supplied creds elsewhere — we don't currently model that. For now,
|
||||
// require all three.
|
||||
type acceptForm struct {
|
||||
RepoURL string `json:"repo_url"`
|
||||
RepoUsername string `json:"repo_username"`
|
||||
RepoPassword string `json:"repo_password"`
|
||||
}
|
||||
|
||||
// handleAcceptPendingHost mints a real Host row + bearer + encrypted
|
||||
// repo creds and pushes the bearer down the agent's open pending WS.
|
||||
// Admin-auth required.
|
||||
func (s *Server) handleAcceptPendingHost(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
||||
user, ok := s.requireUser(r)
|
||||
if !ok {
|
||||
writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "")
|
||||
return
|
||||
}
|
||||
pendingID := chi.URLParam(r, "id")
|
||||
row, err := s.deps.Store.GetPendingHost(r.Context(), pendingID)
|
||||
if err != nil {
|
||||
writeJSONError(w, stdhttp.StatusNotFound, "pending_not_found", "")
|
||||
return
|
||||
}
|
||||
pc := s.pendingHub.get(pendingID)
|
||||
if pc == nil {
|
||||
writeJSONError(w, stdhttp.StatusConflict, "agent_not_connected",
|
||||
"the pending agent is not currently connected; ask it to retry")
|
||||
return
|
||||
}
|
||||
|
||||
var form acceptForm
|
||||
// Accept either JSON or form-urlencoded so HTMX-style POST works.
|
||||
if r.Header.Get("Content-Type") == "application/json" {
|
||||
if err := json.NewDecoder(r.Body).Decode(&form); err != nil {
|
||||
writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error())
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := r.ParseForm(); err != nil {
|
||||
writeJSONError(w, stdhttp.StatusBadRequest, "bad_form", err.Error())
|
||||
return
|
||||
}
|
||||
form.RepoURL = r.PostForm.Get("repo_url")
|
||||
form.RepoUsername = r.PostForm.Get("repo_username")
|
||||
form.RepoPassword = r.PostForm.Get("repo_password")
|
||||
}
|
||||
if form.RepoURL == "" || form.RepoPassword == "" {
|
||||
writeJSONError(w, stdhttp.StatusBadRequest, "missing_field",
|
||||
"repo_url and repo_password are required")
|
||||
return
|
||||
}
|
||||
|
||||
// Mint persistent bearer + Host row.
|
||||
hostID := ulid.Make().String()
|
||||
token, err := auth.NewToken()
|
||||
if err != nil {
|
||||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
|
||||
return
|
||||
}
|
||||
host := store.Host{
|
||||
ID: hostID, Name: row.Hostname, OS: row.OS, Arch: row.Arch,
|
||||
AgentVersion: row.AgentVersion, ResticVersion: row.ResticVersion,
|
||||
EnrolledAt: time.Now().UTC(),
|
||||
}
|
||||
if err := s.deps.Store.CreateHost(r.Context(), host, auth.HashToken(token), ""); err != nil {
|
||||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
|
||||
return
|
||||
}
|
||||
// Encrypt + persist repo creds.
|
||||
enc, err := s.encryptRepoCreds(repoCredsBlob(form), []byte("host:"+hostID))
|
||||
if err != nil {
|
||||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
|
||||
return
|
||||
}
|
||||
if err := s.deps.Store.SetHostCredentials(r.Context(), hostID, store.CredKindRepo, enc); err != nil {
|
||||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
|
||||
return
|
||||
}
|
||||
// Drop the pending row.
|
||||
if err := s.deps.Store.DeletePendingHost(r.Context(), pendingID); err != nil {
|
||||
slog.Warn("accept pending: delete row", "pending_id", pendingID, "err", err)
|
||||
}
|
||||
// Push enrolled message + close the pending WS.
|
||||
enrolled := enrolledMessage{Type: "enrolled", HostID: hostID, Bearer: token}
|
||||
raw, _ := json.Marshal(enrolled)
|
||||
wctx, wcancel := context.WithTimeout(r.Context(), 5*time.Second)
|
||||
if err := pc.conn.Write(wctx, websocket.MessageText, raw); err != nil {
|
||||
slog.Warn("accept pending: write enrolled", "pending_id", pendingID, "err", err)
|
||||
}
|
||||
wcancel()
|
||||
_ = pc.conn.Close(websocket.StatusNormalClosure, "accepted")
|
||||
|
||||
// Audit.
|
||||
uid := user.ID
|
||||
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
|
||||
ID: ulid.Make().String(),
|
||||
UserID: &uid,
|
||||
Actor: "user",
|
||||
Action: "host.accept_pending",
|
||||
TargetKind: ptr("host"),
|
||||
TargetID: &hostID,
|
||||
TS: time.Now().UTC(),
|
||||
})
|
||||
|
||||
writeJSON(w, stdhttp.StatusOK, map[string]any{
|
||||
"host_id": hostID,
|
||||
"fingerprint": row.Fingerprint,
|
||||
})
|
||||
}
|
||||
|
||||
// handleRejectPendingHost deletes the pending row and closes any
|
||||
// open WS for it. Admin-auth required.
|
||||
func (s *Server) handleRejectPendingHost(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
||||
user, ok := s.requireUser(r)
|
||||
if !ok {
|
||||
writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "")
|
||||
return
|
||||
}
|
||||
pendingID := chi.URLParam(r, "id")
|
||||
row, err := s.deps.Store.GetPendingHost(r.Context(), pendingID)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
w.WriteHeader(stdhttp.StatusNoContent)
|
||||
return
|
||||
}
|
||||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
|
||||
return
|
||||
}
|
||||
if pc := s.pendingHub.get(pendingID); pc != nil {
|
||||
_ = pc.conn.Close(4001, "rejected")
|
||||
}
|
||||
if err := s.deps.Store.DeletePendingHost(r.Context(), pendingID); err != nil {
|
||||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error())
|
||||
return
|
||||
}
|
||||
uid := user.ID
|
||||
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
|
||||
ID: ulid.Make().String(),
|
||||
UserID: &uid,
|
||||
Actor: "user",
|
||||
Action: "host.reject_pending",
|
||||
TargetKind: ptr("pending_host"),
|
||||
TargetID: &row.ID,
|
||||
TS: time.Now().UTC(),
|
||||
})
|
||||
w.WriteHeader(stdhttp.StatusNoContent)
|
||||
}
|
||||
Reference in New Issue
Block a user