Files
restic-manager/cmd/server/main.go
T
steve 7a813cacd3 first-run: keep 'bootstrap token' phrase so e2e log-scraper still matches
The CI e2e workflow greps for 'bootstrap token' in server logs to capture
the one-shot token. The earlier reword dropped that phrase; restore it on
the headless-instructions line so .gitea/workflows/e2e.yml step 'Capture
bootstrap token from server logs' keeps matching.
2026-05-09 12:49:40 +01:00

278 lines
9.3 KiB
Go

package main
import (
"context"
"errors"
"flag"
"fmt"
"log/slog"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/alert"
"gitea.dcglab.co.uk/steve/restic-manager/internal/auth"
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
"gitea.dcglab.co.uk/steve/restic-manager/internal/notification"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/config"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/fleetupdate"
rmhttp "gitea.dcglab.co.uk/steve/restic-manager/internal/server/http"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/maintenance"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/metrics"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/oidc"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
var (
version = "dev"
commit = "none"
date = "unknown"
)
func main() {
if err := run(); err != nil {
slog.Error("server fatal", "err", err)
os.Exit(1)
}
}
func run() error {
configPath := flag.String("config", "", "path to YAML config (optional; env vars win regardless)")
showVersion := flag.Bool("version", false, "print version and exit")
flag.Parse()
if *showVersion {
fmt.Printf("restic-manager-server %s (commit %s, built %s)\n", version, commit, date)
return nil
}
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
slog.SetDefault(logger)
cfg, err := config.Load(*configPath)
if err != nil {
return fmt.Errorf("config: %w", err)
}
slog.Info("config resolved", "listen", cfg.Listen, "data_dir", cfg.DataDir,
"cookie_secure", cfg.CookieSecure, "trusted_proxies", cfg.TrustedProxies)
if err := os.MkdirAll(cfg.DataDir, 0o700); err != nil {
return fmt.Errorf("ensure data dir: %w", err)
}
// Mint or load the encryption key.
if _, err := os.Stat(cfg.SecretKeyFile); errors.Is(err, os.ErrNotExist) {
slog.Warn("no secret key found; generating a new one — back this up before continuing",
"path", cfg.SecretKeyFile)
if err := crypto.GenerateKeyFile(cfg.SecretKeyFile); err != nil {
return fmt.Errorf("generate secret key: %w", err)
}
}
keyBytes, err := crypto.LoadKeyFromFile(cfg.SecretKeyFile)
if err != nil {
return fmt.Errorf("load secret key: %w", err)
}
aead, err := crypto.NewAEAD(keyBytes)
if err != nil {
return fmt.Errorf("init AEAD: %w", err)
}
dbPath := filepath.Join(cfg.DataDir, "restic-manager.db")
st, err := store.Open(context.Background(), dbPath)
if err != nil {
return fmt.Errorf("open store: %w", err)
}
defer func() { _ = st.Close() }()
hub := ws.NewHub()
jobHub := ws.NewJobHub()
metricsRegistry := metrics.NewRegistry()
notifHub := notification.NewHub(st, aead, cfg.BaseURL)
alertEngine := alert.NewEngine(st, notifHub)
updateWatcher := ws.NewUpdateWatcher(st, alertEngine, jobHub)
renderer, err := ui.New()
if err != nil {
return fmt.Errorf("ui: %w", err)
}
var oidcClient *oidc.Client
if cfg.OIDC != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
oidcClient, err = oidc.New(ctx, cfg.OIDC, cfg.BaseURL)
if err != nil {
return fmt.Errorf("oidc: %w", err)
}
slog.Info("oidc enabled", "issuer", cfg.OIDC.Issuer, "display", cfg.OIDC.DisplayName)
}
deps := rmhttp.Deps{
Cfg: cfg,
Store: st,
AEAD: aead,
Hub: hub,
JobHub: jobHub,
AlertEngine: alertEngine,
NotificationHub: notifHub,
UpdateWatcher: updateWatcher,
UI: renderer,
Version: version,
OIDC: oidcClient,
Metrics: metricsRegistry,
}
// First-run bootstrap: if the users table is empty, mint a one-time
// token and print it. /api/bootstrap accepts it to create the first
// admin user, then becomes a no-op.
n, err := st.CountUsers(context.Background())
if err != nil {
return fmt.Errorf("count users: %w", err)
}
if n == 0 {
token, err := auth.NewToken()
if err != nil {
return fmt.Errorf("mint bootstrap token: %w", err)
}
deps.BootstrapToken = token
// Stable, easy-to-grep marker so an operator finds this in
// scrolling logs without spelunking. Token is shown in plain
// text exactly once; we hash it into BootstrapToken on the
// server-side handler.
fmt.Fprintln(os.Stderr, "================================================================")
fmt.Fprintln(os.Stderr, " FIRST RUN — no admin user exists yet.")
if cfg.BaseURL != "" {
fmt.Fprintln(os.Stderr, " Open this URL in a browser to create the first administrator:")
fmt.Fprintln(os.Stderr, " "+strings.TrimRight(cfg.BaseURL, "/")+"/bootstrap")
} else {
fmt.Fprintln(os.Stderr, " Open the server URL in a browser; you'll be sent to /bootstrap.")
fmt.Fprintln(os.Stderr, " (Set RM_BASE_URL to have a clickable link printed here.)")
}
fmt.Fprintln(os.Stderr, "")
fmt.Fprintln(os.Stderr, " Headless? POST {token, username, password} to /api/bootstrap")
fmt.Fprintln(os.Stderr, " with this one-shot bootstrap token (valid until first user exists):")
fmt.Fprintln(os.Stderr, " "+token)
fmt.Fprintln(os.Stderr, "================================================================")
}
srv := rmhttp.New(deps)
// Fleet-update worker — built after the HTTP server because the
// dispatcher delegates back into srv.DispatchHostUpdate.
fleetWorker := fleetupdate.NewWorker(st, hub,
&serverDispatcher{srv: srv}, alertEngine)
srv.SetFleetWorker(fleetWorker)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
go alertEngine.Run(ctx)
go updateWatcher.Run(ctx)
errCh := make(chan error, 1)
go func() {
slog.Info("server listening", "addr", cfg.Listen, "version", version)
errCh <- srv.Start()
}()
// Background sweepers:
// - sessions/tokens purge: 15 min
// - host offline-after-90s mark: every 30s (matches heartbeat
// cadence — agent sends every 30s, P1-12)
purgeTick := time.NewTicker(15 * time.Minute)
defer purgeTick.Stop()
offlineTick := time.NewTicker(30 * time.Second)
defer offlineTick.Stop()
// Maintenance ticker: drives forget/prune/check on the cadences
// operators set per-host. Independent of the agent's local cron
// (which only handles backup schedules). 60s cadence — the cron
// expressions are minute-grained, so anything finer is wasted
// work.
maintenanceTick := time.NewTicker(60 * time.Second)
defer maintenanceTick.Stop()
// Pending-runs drain ticker: 30s cadence sweeps every host with
// pending_runs rows whose next_attempt_at <= now (rows accumulate
// when a schedule.fire's command.run send fails because the agent
// dropped offline mid-flight). The on-reconnect path in
// onAgentHello handles the common case; this ticker is the
// safety-net for hosts that come back without a fresh hello (they
// shouldn't, but the queue exists either way).
pendingDrainTick := time.NewTicker(30 * time.Second)
defer pendingDrainTick.Stop()
// Pending-hosts expiry sweeper: drops announce rows past their 1h
// ceiling so the dashboard panel doesn't accumulate stale entries.
pendingExpiryTick := time.NewTicker(60 * time.Second)
defer pendingExpiryTick.Stop()
mt := maintenance.New(st)
go func() {
for {
select {
case <-ctx.Done():
return
case <-purgeTick.C:
if n, err := st.PurgeExpiredSessions(ctx); err == nil && n > 0 {
slog.Info("purged expired sessions", "n", n)
}
if n, err := st.PurgeExpiredEnrollmentTokens(ctx); err == nil && n > 0 {
slog.Info("purged expired enrollment tokens", "n", n)
}
case <-offlineTick.C:
cutoff := time.Now().Add(-90 * time.Second)
if ids, err := st.MarkHostsOfflineStaleReturnIDs(ctx, cutoff); err == nil && len(ids) > 0 {
slog.Info("marked hosts offline (stale heartbeat)", "n", len(ids))
for _, id := range ids {
alertEngine.NotifyHostOffline(id)
}
}
case <-pendingDrainTick.C:
srv.DrainAllDue(ctx)
case <-pendingExpiryTick.C:
if n, err := st.DeleteExpiredPendingHosts(ctx, time.Now().UTC()); err == nil && n > 0 {
slog.Info("expired pending hosts swept", "n", n)
}
case <-maintenanceTick.C:
decisions, err := mt.Decide(ctx, time.Now().UTC())
if err != nil {
slog.Warn("maintenance ticker: decide", "err", err)
continue
}
if len(decisions) > 0 {
slog.Info("maintenance ticker: dispatching", "n", len(decisions))
srv.DispatchMaintenance(ctx, decisions)
}
}
}
}()
select {
case err := <-errCh:
if err != nil {
return fmt.Errorf("listen: %w", err)
}
case <-ctx.Done():
slog.Info("shutting down")
}
shutCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := srv.Shutdown(shutCtx); err != nil {
return fmt.Errorf("shutdown: %w", err)
}
return nil
}
// serverDispatcher adapts the http.Server's DispatchHostUpdate method
// to the fleetupdate.Dispatcher interface. Lives in main so the
// http and fleetupdate packages don't need to know about each other.
type serverDispatcher struct{ srv *rmhttp.Server }
func (d *serverDispatcher) DispatchUpdate(ctx context.Context, hostID, actorUserID string) (string, string, error) {
return d.srv.DispatchHostUpdate(ctx, hostID, actorUserID)
}