phase 1: WS transport, enrollment, agent that hellos and heartbeats
Lands the protocol layer end-to-end: an agent can be enrolled through the operator UI, store credentials, dial back to the server over WS, complete the protocol_version handshake, and stay connected with periodic heartbeats. Server side: - P1-09 ws.Hub: one Conn per host_id, last-write-wins eviction, json envelope writer with a write mutex, reader, error envelopes. - P1-09 ws.AgentHandler: bearer-auth, accept upgrade, hello-stage (10s deadline, protocol_version checked against api.MinAgentProtocolVersion → ErrProtocolTooOld with help URL on reject), main read loop, defer hub register/unregister. - P1-10 POST /api/agents/enroll consumes a one-time token, mints a persistent agent bearer (sha-256 stored), creates a host row. - P1-10 POST /api/enrollment-tokens (operator, session-auth) issues a 1h one-time token. - P1-11 hello upserts agent_version + restic_version + protocol_version on the host row, flips status to online. - P1-12 heartbeat touches last_seen_at; background sweeper marks hosts offline after 90s without one. - store: hosts table accessors, host_schedule_version, enrollment_tokens FK on consumed_host dropped (audit-only field; the token gets burned before the host row exists). Agent side: - P1-13 internal/agent/config: yaml at /etc/restic-manager/agent.yaml, atomic Save (tmp+fsync+rename), Enrolled() helper. - P1-15 internal/agent/wsclient: dial with bearer + optional TLS cert pinning (sha-256 of leaf), exponential backoff with jitter (1s → 60s cap), heartbeat goroutine, fatal handling for ErrProtocolTooOld. - P1-15 wsclient.Enroll: HTTP POST /api/agents/enroll with sysinfo. - P1-17 internal/agent/sysinfo: hostname/OS/arch/restic-version collection. restic detected by `restic version` parse; absent restic doesn't block startup. - cmd/agent: -enroll-server / -enroll-token flags drive first-run enrollment then exit (so the install script can hand off to systemd to run the persistent service). End-to-end smoke verified: bootstrap → login → issue token → enroll → run agent → server logs `ws agent connected` with the right host_id and protocol_version 1. All tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+119
-4
@@ -2,32 +2,147 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/config"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/sysinfo"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||
)
|
||||
|
||||
var version = "dev"
|
||||
|
||||
func main() {
|
||||
if err := run(); err != nil {
|
||||
slog.Error("agent fatal", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func run() error {
|
||||
configPath := flag.String("config", config.DefaultPath(), "path to agent.yaml")
|
||||
enrollServer := flag.String("enroll-server", "", "server URL (used with -enroll-token to perform first-run enrollment)")
|
||||
enrollToken := flag.String("enroll-token", "", "one-time enrollment token (operator copies this from the UI)")
|
||||
showVersion := flag.Bool("version", false, "print version and exit")
|
||||
flag.Parse()
|
||||
|
||||
if *showVersion {
|
||||
fmt.Println("restic-manager-agent", version)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
|
||||
slog.SetDefault(logger)
|
||||
|
||||
cfg, err := config.Load(*configPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("config: %w", err)
|
||||
}
|
||||
|
||||
// Enrollment mode: agent was started with -enroll-server -enroll-token.
|
||||
// On success we persist the credentials and exit (the install script
|
||||
// then starts the agent service). Avoiding a long-running process here
|
||||
// keeps the enrollment story restartable.
|
||||
if *enrollToken != "" {
|
||||
if *enrollServer == "" {
|
||||
return errors.New("enrollment: -enroll-server is required with -enroll-token")
|
||||
}
|
||||
return doEnroll(*enrollServer, *enrollToken, cfg, version)
|
||||
}
|
||||
|
||||
if !cfg.Enrolled() {
|
||||
return fmt.Errorf("agent is not enrolled; run with -enroll-server and -enroll-token first (config %q)", *configPath)
|
||||
}
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
slog.Info("restic-manager agent starting", "version", version)
|
||||
<-ctx.Done()
|
||||
slog.Info("shutting down")
|
||||
snap, err := sysinfo.Collect(ctx, cfg.ResticPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sysinfo: %w", err)
|
||||
}
|
||||
slog.Info("agent starting",
|
||||
"version", version,
|
||||
"host_id", cfg.HostID,
|
||||
"server", cfg.ServerURL,
|
||||
"restic_version", snap.ResticVersion,
|
||||
"protocol_version", snap.ProtocolVersion,
|
||||
)
|
||||
|
||||
wsCfg := wsclient.Config{
|
||||
ServerURL: cfg.ServerURL,
|
||||
AgentToken: cfg.AgentToken,
|
||||
HostID: cfg.HostID,
|
||||
CertPinSHA256: cfg.CertPinSHA256,
|
||||
HelloPayload: api.HelloPayload{
|
||||
ProtocolVersion: snap.ProtocolVersion,
|
||||
AgentVersion: version,
|
||||
ResticVersion: snap.ResticVersion,
|
||||
Hostname: snap.Hostname,
|
||||
OS: snap.OS,
|
||||
Arch: snap.Arch,
|
||||
},
|
||||
}
|
||||
|
||||
if err := wsclient.Run(ctx, wsCfg, dispatch); err != nil {
|
||||
return fmt.Errorf("ws run: %w", err)
|
||||
}
|
||||
slog.Info("agent shutting down")
|
||||
return nil
|
||||
}
|
||||
|
||||
// dispatch handles server-pushed envelopes. Phase 1's first slice
|
||||
// just logs; P1-19/20/21 wire command.run to the runner.
|
||||
func dispatch(_ context.Context, env api.Envelope) error {
|
||||
switch env.Type {
|
||||
case api.MsgCommandRun:
|
||||
slog.Info("ws agent: command.run received (not yet implemented)", "id", env.ID)
|
||||
case api.MsgCommandCancel:
|
||||
slog.Info("ws agent: command.cancel received (not yet implemented)", "id", env.ID)
|
||||
case api.MsgScheduleSet:
|
||||
slog.Info("ws agent: schedule.set received (not yet implemented)", "id", env.ID)
|
||||
case api.MsgConfigUpdate:
|
||||
slog.Info("ws agent: config.update received (not yet implemented)", "id", env.ID)
|
||||
case api.MsgAgentUpdateAvail:
|
||||
slog.Info("ws agent: agent.update.available received (not yet implemented)", "id", env.ID)
|
||||
default:
|
||||
slog.Debug("ws agent: ignored message", "type", env.Type)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func doEnroll(serverURL, token string, cfg *config.Config, agentVersion string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*1e9)
|
||||
defer cancel()
|
||||
|
||||
snap, err := sysinfo.Collect(ctx, cfg.ResticPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sysinfo: %w", err)
|
||||
}
|
||||
res, err := wsclient.Enroll(ctx, serverURL, wsclient.EnrollRequest{
|
||||
Token: token,
|
||||
HostName: snap.Hostname,
|
||||
OS: snap.OS,
|
||||
Arch: snap.Arch,
|
||||
AgentVersion: agentVersion,
|
||||
ResticVersion: snap.ResticVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("enroll: %w", err)
|
||||
}
|
||||
cfg.ServerURL = serverURL
|
||||
cfg.HostID = res.HostID
|
||||
cfg.AgentToken = res.AgentToken
|
||||
cfg.CertPinSHA256 = res.CertPinSHA256
|
||||
if err := cfg.Save(); err != nil {
|
||||
return fmt.Errorf("save config: %w", err)
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "enrolled as host %s on %s\n", res.HostID, serverURL)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user