Files
restic-manager/internal/server/http/server.go
T

392 lines
16 KiB
Go

// Package http hosts the chi-based REST handlers for the control
// plane. The Server type owns the router, the handlers, and the
// graceful-shutdown lifecycle.
package http
import (
"context"
"errors"
stdhttp "net/http"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"gitea.dcglab.co.uk/steve/restic-manager/internal/alert"
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
"gitea.dcglab.co.uk/steve/restic-manager/internal/notification"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/config"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/metrics"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/oidc"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// Deps bundles every collaborator the HTTP server depends on. Wired up
// in cmd/server; tests pass a pared-down Deps with fakes.
type Deps struct {
Cfg config.Config
Store *store.Store
AEAD *crypto.AEAD
Hub *ws.Hub
JobHub *ws.JobHub
UI *ui.Renderer
// AlertEngine (optional, wired in G1) receives job-finished and
// host-online events from the WS handler. Nil until G1 constructs
// the engine at boot.
AlertEngine *alert.Engine
// NotificationHub (optional, wired in G1) is used by the test-fire
// endpoint to dispatch a single synthetic payload through a channel.
NotificationHub *notification.Hub
// UpdateWatcher tracks in-flight agent self-update dispatches and
// reconciles them against incoming hello envelopes. Optional;
// nil = no-op (handlers degrade by skipping the Track call).
UpdateWatcher UpdateWatcher
// FleetWorker drives the rolling fleet-update worker. Optional;
// nil = fleet update endpoints (P6-15) report unavailable.
FleetWorker FleetWorker
// Version is the binary's build version, surfaced in the chrome.
// Empty falls back to "dev".
Version string
// BootstrapToken (optional, populated only on first run) is the raw
// admin-bootstrap token printed in the server logs. While set, the
// /bootstrap endpoint accepts it to create the first admin user.
BootstrapToken string
// OIDC (optional). Non-nil when the operator has configured an
// IdP — handlers under /auth/oidc/* are mounted only when set.
OIDC *oidc.Client
// Metrics (optional). When non-nil the WS job-finished branch
// records job durations and the /metrics handler can pull a
// histogram snapshot. Independent of MetricsAuthEnabled — the
// recorder runs even if the scrape endpoint is gated off, so a
// later config flip doesn't lose the running window.
Metrics *metrics.Registry
}
// Server is the running HTTP server.
type Server struct {
srv *stdhttp.Server
deps Deps
// drainLocks serialises DrainPending per host. The on-hello
// goroutine and the 30s ticker can otherwise race for the same
// host, double-dispatching every pending row. Map of hostID →
// sync.Mutex; checked-and-locked atomically via drainLocksMu.
drainLocksMu sync.Mutex
drainLocks map[string]*sync.Mutex
// announceRL is the per-source-IP token-bucket guarding
// POST /api/agents/announce (P2-18). One process-local map.
announceRL *announceLimiter
// pendingHub holds live /ws/agent/pending sockets keyed by
// pending_id so the accept/reject handlers can push the bearer
// or close cleanly (P2-18b).
pendingHub *pendingHub
// treeCache holds per-wizard-session listings of snapshot
// directories (P3-X2). Pre-allocated in New so the lazy-init
// race is impossible.
treeCache *treeCache
// catchupDueAt tracks intermittent hosts that reconnected and are
// in their settle window. Keyed hostID → earliest time to evaluate
// catch-up. Best-effort + in-memory: a server restart simply re-arms
// on the next hello. Guarded by catchupMu.
catchupMu sync.Mutex
catchupDueAt map[string]time.Time
}
// New builds a configured but not-yet-started server.
func New(deps Deps) *Server {
r := chi.NewRouter()
// Built-in middleware: request ID for log correlation, recovery
// (don't crash the process on a panic in a handler), realIP iff a
// trusted proxy is configured.
r.Use(middleware.RequestID)
r.Use(middleware.Recoverer)
r.Use(requestLogger)
s := &Server{
deps: deps,
drainLocks: make(map[string]*sync.Mutex),
announceRL: newAnnounceLimiter(),
pendingHub: newPendingHub(),
treeCache: newTreeCache(),
catchupDueAt: make(map[string]time.Time),
}
s.routes(r)
s.srv = &stdhttp.Server{
Addr: deps.Cfg.Listen,
Handler: r,
ReadHeaderTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
// Long write timeout — WS upgrades and live log streams need it.
WriteTimeout: 0,
}
return s
}
// routes wires the API tree. Subtrees live in this file by area so a
// reader can scan one place and see the surface.
func (s *Server) routes(r chi.Router) {
// Public, unauthenticated.
r.Get("/healthz", func(w stdhttp.ResponseWriter, _ *stdhttp.Request) {
w.WriteHeader(stdhttp.StatusNoContent)
})
r.Post("/api/auth/login", s.handleLogin)
r.Post("/api/auth/logout", s.handleLogout)
r.Post("/api/bootstrap", s.handleBootstrap)
r.Post("/api/agents/enroll", s.handleAgentEnroll)
r.Post("/api/agents/announce", s.handleAnnounce)
r.Get("/agent/binary", s.handleAgentBinary)
r.Get("/install/*", s.handleInstallAsset)
r.Get("/api/version", s.handleVersion)
if s.deps.Cfg.MetricsAuthEnabled() {
r.Get("/metrics", s.handleMetrics)
}
if s.deps.Hub != nil {
hd := ws.HandlerDeps{
Hub: s.deps.Hub,
Store: s.deps.Store,
JobHub: s.deps.JobHub,
AlertEngine: s.deps.AlertEngine,
Metrics: s.deps.Metrics,
OnHello: s.onAgentHello,
OnScheduleAck: s.applyScheduleAck,
OnScheduleFire: s.dispatchScheduledJob,
}
if w, ok := s.deps.UpdateWatcher.(*ws.UpdateWatcher); ok && w != nil {
hd.UpdateWatcher = w
}
r.Mount("/ws/agent", ws.AgentHandler(hd))
}
r.Get("/ws/agent/pending", s.handlePendingWS)
r.Mount("/static/", staticHandler())
// POST /logout is always mounted — it handles both local and OIDC
// sessions and doesn't require the UI renderer.
r.Post("/logout", s.handleUILogoutPost)
if s.deps.UI != nil {
r.Get("/bootstrap", s.handleUIBootstrapGet)
r.Post("/bootstrap", s.handleUIBootstrapPost)
r.Get("/login", s.handleUILoginGet)
r.Post("/login", s.handleUILoginPost)
r.Get("/setup", s.handleUISetupGet)
r.Post("/setup", s.handleUISetupPost)
}
if s.deps.OIDC != nil {
r.Get("/auth/oidc/login", s.handleOIDCLogin)
r.Get("/auth/oidc/callback", s.handleOIDCCallback)
}
// Viewer band — anyone authenticated can read.
r.Group(func(r chi.Router) {
r.Use(s.requireRole(store.RoleViewer))
// Read APIs.
r.Get("/api/hosts", s.handleListHosts)
r.Get("/api/fleet/summary", s.handleFleetSummary)
r.Get("/api/hosts/{id}/snapshots", s.handleListHostSnapshots)
r.Get("/api/hosts/{id}/repo-credentials", s.handleGetHostCredentials)
r.Get("/api/hosts/{id}/admin-credentials", s.handleGetAdminCredentials)
r.Get("/api/hosts/{id}/schedules", s.handleListSchedules)
r.Get("/api/hosts/{id}/source-groups", s.handleListSourceGroups)
r.Get("/api/hosts/{id}/source-groups/{gid}", s.handleGetSourceGroup)
r.Get("/api/hosts/{id}/repo-maintenance", s.handleGetRepoMaintenance)
r.Get("/api/alerts", s.handleAPIAlerts)
r.Get("/api/audit", s.handleAPIAudit)
r.Post("/api/account/password", s.handleAPIAccountPassword)
// Job log stream + download (read-only; any authenticated user).
if s.deps.JobHub != nil {
r.Get("/api/jobs/{id}/stream", s.handleJobStream)
}
r.Get("/api/jobs/{id}/log.{format:txt|ndjson}", s.handleJobLogDownload)
if s.deps.UI != nil {
r.Get("/", s.handleUIDashboard)
r.Get("/hosts/{id}", s.handleUIHostDetail)
r.Get("/hosts/{id}/sources", s.handleUIHostSources)
r.Get("/hosts/{id}/sources/new", s.handleUISourceGroupNewGet)
r.Get("/hosts/{id}/sources/{gid}/edit", s.handleUISourceGroupEditGet)
r.Get("/hosts/{id}/jobs", s.handleUIHostJobs)
r.Get("/hosts/{id}/repo", s.handleUIHostRepo)
r.Get("/hosts/{id}/repo/trend", s.handleUIRepoTrend)
r.Get("/hosts/{id}/schedules", s.handleUISchedulesList)
r.Get("/hosts/{id}/schedules/new", s.handleUIScheduleNewGet)
r.Get("/hosts/{id}/schedules/{sid}/edit", s.handleUIScheduleEditGet)
r.Get("/jobs/{id}", s.handleUIJobDetail)
r.Get("/hosts/{id}/restore", s.handleUIRestoreGet)
r.Get("/hosts/{id}/snapshots/{sid}/restore", s.handleUIRestoreGet)
r.Get("/hosts/{id}/restore/tree", s.handleUIRestoreTree)
r.Get("/alerts", s.handleUIAlerts)
r.Get("/audit", s.handleUIAudit)
r.Get("/audit.csv", s.handleUIAuditCSV)
r.Get("/settings/account", s.handleUIAccountGet)
r.Post("/settings/account", s.handleUIAccountPost)
}
})
// Operator band — mutating endpoints up to backup ops.
r.Group(func(r chi.Router) {
r.Use(s.requireRole(store.RoleOperator))
// Pending hosts approval.
r.Post("/api/pending-hosts/{id}/accept", s.handleAcceptPendingHost)
r.Post("/api/pending-hosts/{id}/reject", s.handleRejectPendingHost)
r.Post("/api/enrollment-tokens", s.handleCreateEnrollmentToken)
r.Post("/hosts/enrollment-tokens/{hash}/regenerate", s.handleUIEnrollmentTokenRegenerate)
r.Post("/hosts/enrollment-tokens/{hash}/revoke", s.handleUIEnrollmentTokenRevoke)
// Run-now, restore, repo ops (JSON).
r.Post("/api/hosts/{id}/jobs", s.handleRunNow)
r.Put("/api/hosts/{id}/repo-credentials", s.handleSetHostCredentials)
r.Put("/api/hosts/{id}/admin-credentials", s.handleSetAdminCredentials)
r.Delete("/api/hosts/{id}/admin-credentials", s.handleDeleteAdminCredentials)
r.Post("/api/hosts/{id}/schedules", s.handleCreateSchedule)
r.Put("/api/hosts/{id}/schedules/{sid}", s.handleUpdateSchedule)
r.Delete("/api/hosts/{id}/schedules/{sid}", s.handleDeleteSchedule)
r.Post("/api/hosts/{id}/source-groups", s.handleCreateSourceGroup)
r.Put("/api/hosts/{id}/source-groups/{gid}", s.handleUpdateSourceGroup)
r.Delete("/api/hosts/{id}/source-groups/{gid}", s.handleDeleteSourceGroup)
r.Put("/api/hosts/{id}/repo-maintenance", s.handleUpdateRepoMaintenance)
r.Put("/api/hosts/{id}/bandwidth", s.handleUpdateHostBandwidth)
r.Post("/api/hosts/{id}/source-groups/{gid}/run", s.handleRunSourceGroup)
r.Post("/api/hosts/{id}/repo/prune", s.handleRunRepoPrune)
r.Post("/api/hosts/{id}/repo/check", s.handleRunRepoCheck)
r.Post("/api/hosts/{id}/repo/unlock", s.handleRunRepoUnlock)
r.Post("/api/jobs/{id}/cancel", s.handleCancelJob)
r.Post("/api/hosts/{id}/snapshots/diff", s.handleSnapshotDiff)
// HTMX form variants outside /api.
r.Post("/hosts/{id}/snapshots/diff", s.handleSnapshotDiff)
r.Post("/hosts/{id}/source-groups/{gid}/run", s.handleRunSourceGroup)
r.Post("/hosts/{id}/repo/prune", s.handleRunRepoPrune)
r.Post("/hosts/{id}/repo/check", s.handleRunRepoCheck)
r.Post("/hosts/{id}/repo/unlock", s.handleRunRepoUnlock)
r.Post("/hosts/{id}/run-backup", s.handleUIRunBackupGone)
r.Post("/hosts/{id}/init-repo", s.handleUIInitRepoGone)
if s.deps.UI != nil {
r.Get("/hosts/new", s.handleUIAddHostGet)
r.Post("/hosts/new", s.handleUIAddHostPost)
r.Get("/hosts/pending/{token}", s.handleUIPendingHost)
r.Get("/hosts/pending/{token}/awaiting", s.handleUIPendingAwaiting)
r.Post("/hosts/{id}/sources/new", s.handleUISourceGroupSave)
r.Post("/hosts/{id}/sources/{gid}/edit", s.handleUISourceGroupSave)
r.Post("/hosts/{id}/sources/{gid}/delete", s.handleUISourceGroupDelete)
r.Post("/hosts/{id}/repo/credentials", s.handleUIRepoCredentialsSave)
r.Post("/hosts/{id}/repo/bandwidth", s.handleUIRepoBandwidthSave)
r.Post("/hosts/{id}/repo/maintenance", s.handleUIRepoMaintenanceSave)
r.Post("/hosts/{id}/repo/reinit", s.handleUIRepoReinit)
r.Post("/hosts/{id}/repo/probe", s.handleUIRepoProbe)
r.Post("/hosts/{id}/repo/hooks", s.handleUIRepoHooksSave)
r.Post("/hosts/{id}/tags", s.handleUIHostTagsSave)
r.Post("/hosts/{id}/mode", s.handleUIHostModeSave)
r.Post("/hosts/{id}/admin-credentials", s.handleUIAdminCredentialsSave)
r.Post("/hosts/{id}/admin-credentials/delete", s.handleUIAdminCredentialsDelete)
r.Post("/hosts/{id}/schedules/new", s.handleUIScheduleSave)
r.Post("/hosts/{id}/schedules/{sid}/edit", s.handleUIScheduleSave)
r.Post("/hosts/{id}/schedules/{sid}/delete", s.handleUIScheduleDelete)
r.Post("/hosts/{id}/schedules/{sid}/run", s.handleUIScheduleRun)
r.Post("/hosts/{id}/restore", s.handleUIRestorePost)
r.Post("/alerts/{id}/acknowledge", s.handleUIAlertAcknowledge)
r.Post("/alerts/{id}/resolve", s.handleUIAlertResolve)
}
})
// Admin band — channels, server-shape config.
r.Group(func(r chi.Router) {
r.Use(s.requireRole(store.RoleAdmin))
r.Post("/api/hosts/{id}/update", s.handleHostUpdate)
r.Post("/hosts/{id}/update", s.handleHostUpdateForm)
// Fleet update (P6-15): rolling update across many hosts.
r.Post("/api/fleet/update", s.handleAPIFleetUpdateStart)
r.Post("/api/fleet-updates/{id}/cancel", s.handleAPIFleetUpdateCancel)
r.Get("/api/fleet-updates/{id}", s.handleAPIFleetUpdateGet)
r.Get("/api/users", s.handleAPIUsersList)
r.Post("/api/users", s.handleAPIUserCreate)
r.Get("/api/users/{id}", s.handleAPIUserGet)
r.Patch("/api/users/{id}", s.handleAPIUserPatch)
r.Post("/api/users/{id}/disable", s.handleAPIUserDisable)
r.Post("/api/users/{id}/enable", s.handleAPIUserEnable)
r.Post("/api/users/{id}/regenerate-setup", s.handleAPIUserRegenerateSetup)
r.Post("/api/users/{id}/force-logout", s.handleAPIUserForceLogout)
r.Post("/api/notifications/{id}/test", s.handleAPINotificationTest)
if s.deps.UI != nil {
r.Post("/hosts/{id}/delete", s.handleUIHostDelete)
r.Get("/settings", s.handleUISettings)
r.Get("/settings/fleet-update", s.handleUIFleetUpdate)
r.Get("/settings/fleet-update/partial", s.handleUIFleetUpdatePartial)
r.Get("/settings/users", s.handleUIUsersList)
r.Get("/settings/users/new", s.handleUIUserNewGet)
r.Post("/settings/users/new", s.handleUIUserNewPost)
r.Get("/settings/users/{id}/edit", s.handleUIUserEditGet)
r.Post("/settings/users/{id}/edit", s.handleUIUserEditPost)
r.Post("/settings/users/{id}/disable", s.handleUIUserDisablePost)
r.Post("/settings/users/{id}/enable", s.handleUIUserEnablePost)
r.Post("/settings/users/{id}/regenerate-setup", s.handleUIUserRegenerateSetupPost)
r.Post("/settings/users/{id}/force-logout", s.handleUIUserForceLogoutPost)
r.Get("/settings/users/{id}/setup-link", s.handleUIUserSetupLinkGet)
r.Get("/settings/notifications", s.handleUINotificationsList)
r.Get("/settings/notifications/new", s.handleUINotificationNewGet)
r.Post("/settings/notifications/new", s.handleUINotificationNewPost)
r.Get("/settings/notifications/{id}/edit", s.handleUINotificationEditGet)
r.Post("/settings/notifications/{id}/edit", s.handleUINotificationEditPost)
r.Post("/settings/notifications/{id}/delete", s.handleUINotificationDelete)
r.Post("/settings/notifications/{id}/toggle", s.handleUINotificationToggle)
}
})
}
// Start begins listening. Blocks until ListenAndServe returns
// (typically only on Shutdown). The server is HTTP-only by design;
// production deployments terminate TLS at a reverse proxy in front.
func (s *Server) Start() error {
err := s.srv.ListenAndServe()
if errors.Is(err, stdhttp.ErrServerClosed) {
return nil
}
return err
}
// Shutdown stops accepting new connections and waits up to ctx.Deadline
// for in-flight handlers to finish.
func (s *Server) Shutdown(ctx context.Context) error {
return s.srv.Shutdown(ctx)
}
// SetFleetWorker installs the fleet-update worker post-construction.
// Used to break the wiring loop in cmd/server (the worker depends on a
// dispatcher that delegates back into the server's host-update path).
func (s *Server) SetFleetWorker(fw FleetWorker) { s.deps.FleetWorker = fw }
// DispatchHostUpdate is the public entry point for callers (the fleet
// worker) that need to drive the same dispatch path the HTTP handler
// uses, without going through HTTP. Returns the structured result so
// the caller can map error codes to its own status enum.
func (s *Server) DispatchHostUpdate(ctx context.Context, hostID, actorUserID string) (jobID string, code string, err error) {
var actorID *string
if actorUserID != "" {
actorID = &actorUserID
}
res := s.dispatchHostUpdate(ctx, hostID, "user", actorID)
if res.Code != "" {
return res.JobID, res.Code, nil
}
return res.JobID, "", nil
}
// Addr returns the configured listen address. Useful in tests when
// the caller passes :0 to get a random port.
func (s *Server) Addr() string { return s.srv.Addr }