// pending_ws.go — /ws/agent/pending and the admin accept/reject // endpoints for the announce-and-approve enrolment flow (P2-18b). // // Flow: // 1. Agent has previously called POST /api/agents/announce, which // returned its pending_id + fingerprint. Agent persists the // keypair locally. // 2. Agent connects to /ws/agent/pending?pending_id=… (no auth). // Server reads the row, generates a 32-byte nonce, sends it. // 3. Agent signs the nonce with its Ed25519 private key, sends the // signature back. Server verifies; close on bad sig. // 4. The connection sits open; the agent reads but doesn't write. // 5. Admin clicks Accept: POST /api/pending-hosts/{id}/accept with // the same repo-creds form the token-mint flow uses. Server // mints a Host row + bearer + encrypted creds, pushes one // `enrolled` message down the open socket, closes cleanly. // 6. Admin clicks Reject: socket closes with code 4001. // // Hub: a process-local in-memory map of pending_id → live conn so // the accept/reject handlers can find the right socket. Sole // instance lives on Server.pendingHub. package http import ( "context" "crypto/ed25519" "crypto/rand" "encoding/base64" "encoding/json" "errors" "log/slog" stdhttp "net/http" "sync" "time" "github.com/coder/websocket" "github.com/go-chi/chi/v5" "github.com/oklog/ulid/v2" "gitea.dcglab.co.uk/steve/restic-manager/internal/auth" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) // pendingConn is a single live /ws/agent/pending session. The accept // handler sends the enrolment message via Send and closes the socket; // the WS read loop is just waiting for that close. type pendingConn struct { conn *websocket.Conn pendingID string closed chan struct{} } // pendingHub is the in-memory map of pending_id → live socket. type pendingHub struct { mu sync.Mutex conns map[string]*pendingConn } func newPendingHub() *pendingHub { return &pendingHub{conns: map[string]*pendingConn{}} } func (h *pendingHub) register(pc *pendingConn) { h.mu.Lock() defer h.mu.Unlock() // Replace any existing socket for the same pending_id (an agent // reconnected) — close the old one cleanly first so its goroutine // can exit. if old, ok := h.conns[pc.pendingID]; ok { _ = old.conn.Close(websocket.StatusNormalClosure, "superseded") close(old.closed) } h.conns[pc.pendingID] = pc } func (h *pendingHub) unregister(pendingID string, pc *pendingConn) { h.mu.Lock() defer h.mu.Unlock() if cur, ok := h.conns[pendingID]; ok && cur == pc { delete(h.conns, pendingID) } } func (h *pendingHub) get(pendingID string) *pendingConn { h.mu.Lock() defer h.mu.Unlock() return h.conns[pendingID] } // nonceMessage is what the server sends first on /ws/agent/pending. type nonceMessage struct { Type string `json:"type"` // "nonce" Nonce string `json:"nonce"` // base64 } // signedNonceMessage is what the agent sends back. type signedNonceMessage struct { Type string `json:"type"` // "signed_nonce" Signature string `json:"signature"` // base64 } // enrolledMessage is what the server sends on accept. The agent // persists the bearer to agent.yaml and exits announce mode. type enrolledMessage struct { Type string `json:"type"` // "enrolled" HostID string `json:"host_id"` Bearer string `json:"bearer"` ServerID string `json:"server_id,omitempty"` } // handlePendingWS upgrades the WS, runs the nonce-sign handshake, // registers the conn in the hub, and blocks until the conn is // closed (by accept/reject or by the agent disconnecting). func (s *Server) handlePendingWS(w stdhttp.ResponseWriter, r *stdhttp.Request) { pendingID := r.URL.Query().Get("pending_id") if pendingID == "" { stdhttp.Error(w, "missing pending_id", stdhttp.StatusBadRequest) return } row, err := s.deps.Store.GetPendingHost(r.Context(), pendingID) if err != nil { stdhttp.Error(w, "pending host not found", stdhttp.StatusNotFound) return } if time.Now().UTC().After(row.ExpiresAt) { stdhttp.Error(w, "pending host expired", stdhttp.StatusGone) return } conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ // Same-origin defaults are safe: the agent isn't a browser. InsecureSkipVerify: true, }) if err != nil { slog.Warn("pending ws: accept", "pending_id", pendingID, "err", err) return } // Generate + send nonce. nonce := make([]byte, 32) if _, err := rand.Read(nonce); err != nil { _ = conn.Close(websocket.StatusInternalError, "nonce gen") return } nm := nonceMessage{Type: "nonce", Nonce: base64.StdEncoding.EncodeToString(nonce)} raw, _ := json.Marshal(nm) wctx, wcancel := context.WithTimeout(r.Context(), 5*time.Second) if err := conn.Write(wctx, websocket.MessageText, raw); err != nil { wcancel() _ = conn.Close(websocket.StatusInternalError, "send nonce") return } wcancel() // Read signed nonce back. rctx, rcancel := context.WithTimeout(r.Context(), 30*time.Second) mt, body, err := conn.Read(rctx) rcancel() if err != nil || mt != websocket.MessageText { _ = conn.Close(websocket.StatusPolicyViolation, "no signed nonce") return } var sig signedNonceMessage if err := json.Unmarshal(body, &sig); err != nil || sig.Type != "signed_nonce" { _ = conn.Close(websocket.StatusPolicyViolation, "bad signed nonce shape") return } sigBytes, err := base64.StdEncoding.DecodeString(sig.Signature) if err != nil { _ = conn.Close(websocket.StatusPolicyViolation, "bad signature b64") return } if !ed25519.Verify(row.PublicKey, nonce, sigBytes) { _ = conn.Close(websocket.StatusPolicyViolation, "signature does not verify") return } // Touch the row so the dashboard knows the agent is live. _ = s.deps.Store.TouchPendingHost(context.Background(), pendingID, time.Now().UTC()) // Register and block until close. pc := &pendingConn{conn: conn, pendingID: pendingID, closed: make(chan struct{})} s.pendingHub.register(pc) defer s.pendingHub.unregister(pendingID, pc) // Read loop: we don't expect any further frames from the agent. // If the agent closes, we exit. go func() { for { ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) _, _, err := conn.Read(ctx) cancel() if err != nil { close(pc.closed) return } } }() <-pc.closed } // acceptForm is the admin form for POST /api/pending-hosts/{id}/accept. // repo_password may be omitted only when the host already has admin- // supplied creds elsewhere — we don't currently model that. For now, // require all three. type acceptForm struct { RepoURL string `json:"repo_url"` RepoUsername string `json:"repo_username"` RepoPassword string `json:"repo_password"` } // handleAcceptPendingHost mints a real Host row + bearer + encrypted // repo creds and pushes the bearer down the agent's open pending WS. // Admin-auth required. func (s *Server) handleAcceptPendingHost(w stdhttp.ResponseWriter, r *stdhttp.Request) { user, ok := s.requireUser(r) if !ok { writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") return } pendingID := chi.URLParam(r, "id") row, err := s.deps.Store.GetPendingHost(r.Context(), pendingID) if err != nil { writeJSONError(w, stdhttp.StatusNotFound, "pending_not_found", "") return } pc := s.pendingHub.get(pendingID) if pc == nil { writeJSONError(w, stdhttp.StatusConflict, "agent_not_connected", "the pending agent is not currently connected; ask it to retry") return } var form acceptForm // Accept either JSON or form-urlencoded so HTMX-style POST works. if r.Header.Get("Content-Type") == "application/json" { if err := json.NewDecoder(r.Body).Decode(&form); err != nil { writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error()) return } } else { if err := r.ParseForm(); err != nil { writeJSONError(w, stdhttp.StatusBadRequest, "bad_form", err.Error()) return } form.RepoURL = r.PostForm.Get("repo_url") form.RepoUsername = r.PostForm.Get("repo_username") form.RepoPassword = r.PostForm.Get("repo_password") } if form.RepoURL == "" || form.RepoPassword == "" { writeJSONError(w, stdhttp.StatusBadRequest, "missing_field", "repo_url and repo_password are required") return } // Mint persistent bearer + Host row. hostID := ulid.Make().String() token, err := auth.NewToken() if err != nil { writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) return } host := store.Host{ ID: hostID, Name: row.Hostname, OS: row.OS, Arch: row.Arch, AgentVersion: row.AgentVersion, ResticVersion: row.ResticVersion, EnrolledAt: time.Now().UTC(), } if err := s.deps.Store.CreateHost(r.Context(), host, auth.HashToken(token), ""); err != nil { writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) return } // Encrypt + persist repo creds. enc, err := s.encryptRepoCreds(repoCredsBlob(form), []byte("host:"+hostID)) if err != nil { writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) return } if err := s.deps.Store.SetHostCredentials(r.Context(), hostID, store.CredKindRepo, enc); err != nil { writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) return } // Drop the pending row. if err := s.deps.Store.DeletePendingHost(r.Context(), pendingID); err != nil { slog.Warn("accept pending: delete row", "pending_id", pendingID, "err", err) } // Push enrolled message + close the pending WS. enrolled := enrolledMessage{Type: "enrolled", HostID: hostID, Bearer: token} raw, _ := json.Marshal(enrolled) wctx, wcancel := context.WithTimeout(r.Context(), 5*time.Second) if err := pc.conn.Write(wctx, websocket.MessageText, raw); err != nil { slog.Warn("accept pending: write enrolled", "pending_id", pendingID, "err", err) } wcancel() _ = pc.conn.Close(websocket.StatusNormalClosure, "accepted") // Audit. uid := user.ID _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ ID: ulid.Make().String(), UserID: &uid, Actor: "user", Action: "host.accept_pending", TargetKind: ptr("host"), TargetID: &hostID, TS: time.Now().UTC(), }) writeJSON(w, stdhttp.StatusOK, map[string]any{ "host_id": hostID, "fingerprint": row.Fingerprint, }) } // handleRejectPendingHost deletes the pending row and closes any // open WS for it. Admin-auth required. func (s *Server) handleRejectPendingHost(w stdhttp.ResponseWriter, r *stdhttp.Request) { user, ok := s.requireUser(r) if !ok { writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") return } pendingID := chi.URLParam(r, "id") row, err := s.deps.Store.GetPendingHost(r.Context(), pendingID) if err != nil { if errors.Is(err, store.ErrNotFound) { w.WriteHeader(stdhttp.StatusNoContent) return } writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) return } if pc := s.pendingHub.get(pendingID); pc != nil { _ = pc.conn.Close(4001, "rejected") } if err := s.deps.Store.DeletePendingHost(r.Context(), pendingID); err != nil { writeJSONError(w, stdhttp.StatusInternalServerError, "internal", err.Error()) return } uid := user.ID _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ ID: ulid.Make().String(), UserID: &uid, Actor: "user", Action: "host.reject_pending", TargetKind: ptr("pending_host"), TargetID: &row.ID, TS: time.Now().UTC(), }) w.WriteHeader(stdhttp.StatusNoContent) }