phase 1: run-now backup — restic wrapper, job lifecycle, end-to-end
Lands the operator → server → agent → restic → server roundtrip for
on-demand backups. The flow:
POST /api/hosts/{id}/jobs {kind:"backup",args:["/path"]}
→ server creates a queued Job row
→ server emits command.run over WS to the host's agent
→ agent dispatcher spawns runner.RunBackup in a goroutine
→ runner spawns `restic backup --json`, parses each line
→ forwards: job.started, log.stream (every line), job.progress
(throttled to 1/sec), job.finished (with summary stats blob)
→ server WS handler persists those into jobs / job_logs
P1-16 internal/restic: thin Locate + Env wrapper that runs `restic
backup --json`, scans stdout/stderr, parses BackupStatus +
BackupSummary, calls back into a LineHandler so the agent can fan
out to log.stream + job.progress. Treats exit code 3 as
"succeeded with issues" (matches restic's contract).
P1-18 store: jobs accessors (CreateJob, MarkJobStarted,
MarkJobFinished, AppendJobLog, GetJob).
P1-19 server: POST /api/hosts/{id}/jobs creates the Job row,
validates kind, dispatches via Hub.Send, audit-logs the action.
P1-20 agent runner: wraps restic.RunBackup with throttled progress
emission. Sender abstraction was added to wsclient.Handler so
background goroutines can keep replying after dispatch returns.
P1-21 server WS: dispatchAgentMessage now persists job.started,
job.finished, log.stream into the database. Browser fan-out for
live tailing lands with the UI work.
Agent gets repo_url + repo_password from agent.yaml in plaintext
for now (mode 0600, owned by service user); spec.md §7.3's keyring
storage moves there in P2. config.update over WS overrides the
in-memory copy (does not persist).
Build clean; all tests pass. End-to-end with a real restic still
needs a host that has restic installed — wire shape verified by
the existing hello/heartbeat round-trip test.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+81
-14
@@ -9,11 +9,14 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/config"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/runner"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/sysinfo"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
|
||||
)
|
||||
|
||||
var version = "dev"
|
||||
@@ -45,10 +48,6 @@ func run() error {
|
||||
return fmt.Errorf("config: %w", err)
|
||||
}
|
||||
|
||||
// Enrollment mode: agent was started with -enroll-server -enroll-token.
|
||||
// On success we persist the credentials and exit (the install script
|
||||
// then starts the agent service). Avoiding a long-running process here
|
||||
// keeps the enrollment story restartable.
|
||||
if *enrollToken != "" {
|
||||
if *enrollServer == "" {
|
||||
return errors.New("enrollment: -enroll-server is required with -enroll-token")
|
||||
@@ -75,6 +74,8 @@ func run() error {
|
||||
"protocol_version", snap.ProtocolVersion,
|
||||
)
|
||||
|
||||
resticBin, _ := restic.Locate(cfg.ResticPath) // empty is fine; commands fail with a clear error later
|
||||
|
||||
wsCfg := wsclient.Config{
|
||||
ServerURL: cfg.ServerURL,
|
||||
AgentToken: cfg.AgentToken,
|
||||
@@ -90,35 +91,101 @@ func run() error {
|
||||
},
|
||||
}
|
||||
|
||||
if err := wsclient.Run(ctx, wsCfg, dispatch); err != nil {
|
||||
d := &dispatcher{
|
||||
resticBin: resticBin,
|
||||
repoURL: cfg.RepoURL,
|
||||
repoPassword: cfg.RepoPassword,
|
||||
}
|
||||
if err := wsclient.Run(ctx, wsCfg, d.handle); err != nil {
|
||||
return fmt.Errorf("ws run: %w", err)
|
||||
}
|
||||
slog.Info("agent shutting down")
|
||||
return nil
|
||||
}
|
||||
|
||||
// dispatch handles server-pushed envelopes. Phase 1's first slice
|
||||
// just logs; P1-19/20/21 wire command.run to the runner.
|
||||
func dispatch(_ context.Context, env api.Envelope) error {
|
||||
// dispatcher closes over the long-lived agent settings (restic path,
|
||||
// repo creds) so handle() can spawn the runner without re-loading
|
||||
// config every time.
|
||||
type dispatcher struct {
|
||||
resticBin string
|
||||
repoURL string
|
||||
repoPassword string
|
||||
}
|
||||
|
||||
func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error {
|
||||
switch env.Type {
|
||||
case api.MsgCommandRun:
|
||||
slog.Info("ws agent: command.run received (not yet implemented)", "id", env.ID)
|
||||
var p api.CommandRunPayload
|
||||
if err := env.UnmarshalPayload(&p); err != nil {
|
||||
return fmt.Errorf("command.run: %w", err)
|
||||
}
|
||||
return d.runJob(ctx, p, tx)
|
||||
|
||||
case api.MsgCommandCancel:
|
||||
slog.Info("ws agent: command.cancel received (not yet implemented)", "id", env.ID)
|
||||
// TODO(P2): cancellation requires keeping a job→cancelFunc map.
|
||||
slog.Info("ws agent: command.cancel received (cancellation lands in P2)", "id", env.ID)
|
||||
|
||||
case api.MsgScheduleSet:
|
||||
slog.Info("ws agent: schedule.set received (not yet implemented)", "id", env.ID)
|
||||
// TODO(P2): apply the schedule.
|
||||
slog.Info("ws agent: schedule.set received (handled in P2)", "id", env.ID)
|
||||
|
||||
case api.MsgConfigUpdate:
|
||||
slog.Info("ws agent: config.update received (not yet implemented)", "id", env.ID)
|
||||
var p api.ConfigUpdatePayload
|
||||
_ = env.UnmarshalPayload(&p)
|
||||
// In-memory only for now — restart loses these. Persistent
|
||||
// secret storage lands with P2's keyring work.
|
||||
if p.RepoURL != "" {
|
||||
d.repoURL = p.RepoURL
|
||||
slog.Info("ws agent: repo URL updated via config.update")
|
||||
}
|
||||
if p.RepoPassword != "" {
|
||||
d.repoPassword = p.RepoPassword
|
||||
slog.Info("ws agent: repo password updated via config.update")
|
||||
}
|
||||
|
||||
case api.MsgAgentUpdateAvail:
|
||||
slog.Info("ws agent: agent.update.available received (not yet implemented)", "id", env.ID)
|
||||
var p api.AgentUpdateAvailablePayload
|
||||
_ = env.UnmarshalPayload(&p)
|
||||
slog.Info("ws agent: update available", "version", p.LatestVersion, "url", p.PackageURL)
|
||||
|
||||
default:
|
||||
slog.Debug("ws agent: ignored message", "type", env.Type)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// runJob spawns a runner for one job. We launch a goroutine so the
|
||||
// WS read loop keeps draining messages while restic chugs along.
|
||||
func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsclient.Sender) error {
|
||||
if d.resticBin == "" {
|
||||
return fmt.Errorf("restic binary not located on this agent")
|
||||
}
|
||||
if d.repoURL == "" || d.repoPassword == "" {
|
||||
return fmt.Errorf("repo credentials not configured (set repo_url + repo_password in agent.yaml or push via config.update)")
|
||||
}
|
||||
r := runner.New(runner.Config{
|
||||
ResticBin: d.resticBin,
|
||||
RepoURL: d.repoURL,
|
||||
RepoPassword: d.repoPassword,
|
||||
}, tx, time.Second)
|
||||
|
||||
switch p.Kind {
|
||||
case api.JobBackup:
|
||||
// Agent.Args carries [paths...]. Excludes/tags are not yet
|
||||
// surfaced over the wire; they come with P2 schedule support.
|
||||
go func() {
|
||||
if err := r.RunBackup(ctx, p.JobID, p.Args, nil, nil); err != nil {
|
||||
slog.Warn("agent: backup job failed", "job_id", p.JobID, "err", err)
|
||||
}
|
||||
}()
|
||||
default:
|
||||
return fmt.Errorf("kind %q not implemented yet (Phase 2 lands the rest)", p.Kind)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func doEnroll(serverURL, token string, cfg *config.Config, agentVersion string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*1e9)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
snap, err := sysinfo.Collect(ctx, cfg.ResticPath)
|
||||
|
||||
@@ -33,6 +33,15 @@ type Config struct {
|
||||
// ResticPath overrides the auto-detected restic binary path.
|
||||
ResticPath string `yaml:"restic_path,omitempty"`
|
||||
|
||||
// RepoURL + RepoPassword are the credentials this host uses to
|
||||
// reach its restic repository. Phase 1 keeps these in plaintext
|
||||
// in agent.yaml (mode 0600 owned by the agent service user); the
|
||||
// server-pushed config.update message can override them in
|
||||
// memory. Phase 2 moves them into the OS keyring (DPAPI on
|
||||
// Windows, Secret Service on Linux).
|
||||
RepoURL string `yaml:"repo_url,omitempty"`
|
||||
RepoPassword string `yaml:"repo_password,omitempty"`
|
||||
|
||||
// path is the file we loaded from. Used by Save.
|
||||
path string `yaml:"-"`
|
||||
}
|
||||
|
||||
@@ -0,0 +1,141 @@
|
||||
// Package runner spawns restic processes for the agent. It owns one
|
||||
// Run() invocation per command.run; concurrency limits live a layer
|
||||
// up (the WS handler).
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/restic"
|
||||
)
|
||||
|
||||
// Sender is the agent's outbound message channel. Provided by
|
||||
// wsclient so the runner can push job.started / job.progress /
|
||||
// job.finished / log.stream back to the server.
|
||||
type Sender interface {
|
||||
Send(env api.Envelope) error
|
||||
}
|
||||
|
||||
// Config bundles the long-lived settings the runner needs. They come
|
||||
// from the agent's config file (server-pushed config.update payloads
|
||||
// override these in memory).
|
||||
type Config struct {
|
||||
ResticBin string
|
||||
RepoURL string
|
||||
RepoPassword string
|
||||
}
|
||||
|
||||
// Runner owns the restic invocations.
|
||||
type Runner struct {
|
||||
cfg Config
|
||||
tx Sender
|
||||
|
||||
// progress throttling: we receive a status event from restic
|
||||
// every ~100ms; the UI doesn't need anywhere near that rate.
|
||||
// Cap WS sends to one per N (configurable; default 1s).
|
||||
progressMinPeriod time.Duration
|
||||
}
|
||||
|
||||
// New builds a Runner. progressMinPeriod = 0 uses the default 1s.
|
||||
func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner {
|
||||
if progressMinPeriod <= 0 {
|
||||
progressMinPeriod = time.Second
|
||||
}
|
||||
return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod}
|
||||
}
|
||||
|
||||
// RunBackup executes a backup job and reports back via the sender.
|
||||
// Returns nil on a clean (or "incomplete-but-snapshot-created") finish.
|
||||
func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error {
|
||||
startedAt := time.Now().UTC()
|
||||
|
||||
startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{
|
||||
JobID: jobID, Kind: api.JobBackup, StartedAt: startedAt,
|
||||
})
|
||||
if err := r.tx.Send(startEnv); err != nil {
|
||||
slog.Warn("runner: send job.started", "err", err)
|
||||
}
|
||||
|
||||
env := restic.Env{
|
||||
Bin: r.cfg.ResticBin,
|
||||
RepoURL: r.cfg.RepoURL,
|
||||
RepoPassword: r.cfg.RepoPassword,
|
||||
}
|
||||
|
||||
var seq atomic.Int64
|
||||
lastProgress := time.Now()
|
||||
|
||||
handle := func(stream string, line string, ev any) {
|
||||
// Forward every line to the server as log.stream.
|
||||
now := time.Now().UTC()
|
||||
logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
|
||||
JobID: jobID,
|
||||
Seq: seq.Add(1),
|
||||
TS: now,
|
||||
Stream: api.LogStream(stream),
|
||||
Payload: line,
|
||||
})
|
||||
_ = r.tx.Send(logEnv)
|
||||
|
||||
// Throttled progress events.
|
||||
if status, ok := ev.(restic.BackupStatus); ok {
|
||||
if time.Since(lastProgress) < r.progressMinPeriod {
|
||||
return
|
||||
}
|
||||
lastProgress = time.Now()
|
||||
progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{
|
||||
JobID: jobID,
|
||||
PercentDone: status.PercentDone,
|
||||
FilesDone: status.FilesDone,
|
||||
TotalFiles: status.TotalFiles,
|
||||
BytesDone: status.BytesDone,
|
||||
TotalBytes: status.TotalBytes,
|
||||
ETASeconds: status.SecondsRem,
|
||||
ThroughputBps: throughput(status.BytesDone, status.SecondsElapsed),
|
||||
})
|
||||
_ = r.tx.Send(progEnv)
|
||||
}
|
||||
}
|
||||
|
||||
summary, err := env.RunBackup(ctx, paths, excludes, tags, handle)
|
||||
finishedAt := time.Now().UTC()
|
||||
|
||||
status := api.JobSucceeded
|
||||
exit := 0
|
||||
errMsg := ""
|
||||
if err != nil {
|
||||
status = api.JobFailed
|
||||
exit = -1
|
||||
errMsg = err.Error()
|
||||
}
|
||||
var statsBlob json.RawMessage
|
||||
if summary != nil {
|
||||
statsBlob, _ = json.Marshal(summary)
|
||||
}
|
||||
finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{
|
||||
JobID: jobID,
|
||||
Status: status,
|
||||
ExitCode: exit,
|
||||
FinishedAt: finishedAt,
|
||||
Stats: statsBlob,
|
||||
Error: errMsg,
|
||||
})
|
||||
_ = r.tx.Send(finEnv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("runner backup: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func throughput(bytesDone, secondsElapsed int64) int64 {
|
||||
if secondsElapsed <= 0 {
|
||||
return 0
|
||||
}
|
||||
return bytesDone / secondsElapsed
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
stdhttp "net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coder/websocket"
|
||||
@@ -36,10 +37,19 @@ type Config struct {
|
||||
HelloPayload api.HelloPayload
|
||||
}
|
||||
|
||||
// Handler is invoked for every server-sent message. The agent's main
|
||||
// program supplies one that knows how to dispatch command.run etc.
|
||||
// to the runner package.
|
||||
type Handler func(ctx context.Context, env api.Envelope) error
|
||||
// Sender is what handlers use to push agent → server messages
|
||||
// (job.progress, job.finished, log.stream, command.result, …).
|
||||
// Returned by the WS client to the dispatch handler. Write operations
|
||||
// serialise behind a single mutex on the conn; concurrent calls are
|
||||
// safe.
|
||||
type Sender interface {
|
||||
Send(env api.Envelope) error
|
||||
}
|
||||
|
||||
// Handler is invoked for every server-sent message. tx lets the
|
||||
// handler push replies back; it is valid only for the lifetime of
|
||||
// the connection (calls fail if the agent has reconnected since).
|
||||
type Handler func(ctx context.Context, env api.Envelope, tx Sender) error
|
||||
|
||||
// Run keeps the agent connected indefinitely. Returns when ctx is
|
||||
// cancelled. Errors during a single connection attempt are logged and
|
||||
@@ -107,6 +117,8 @@ func connectOnce(ctx context.Context, cfg Config, handle Handler) error {
|
||||
}
|
||||
slog.Info("ws agent connected", "server", wsURL)
|
||||
|
||||
tx := &connSender{conn: conn, ctx: ctx}
|
||||
|
||||
// Heartbeat goroutine.
|
||||
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
|
||||
defer cancelHeartbeat()
|
||||
@@ -138,13 +150,34 @@ func connectOnce(ctx context.Context, cfg Config, handle Handler) error {
|
||||
continue
|
||||
}
|
||||
if handle != nil {
|
||||
if err := handle(ctx, env); err != nil {
|
||||
if err := handle(ctx, env, tx); err != nil {
|
||||
slog.Warn("ws agent: handler returned error", "type", env.Type, "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// connSender is the per-connection Sender. Goroutines beyond the
|
||||
// read loop (e.g. a backup running in its own goroutine) keep a
|
||||
// reference to one of these for the duration of their work.
|
||||
type connSender struct {
|
||||
conn *websocket.Conn
|
||||
ctx context.Context
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (s *connSender) Send(env api.Envelope) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
raw, err := json.Marshal(env)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
writeCtx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
return s.conn.Write(writeCtx, websocket.MessageText, raw)
|
||||
}
|
||||
|
||||
func heartbeatLoop(ctx context.Context, conn *websocket.Conn, period time.Duration) {
|
||||
t := time.NewTicker(period)
|
||||
defer t.Stop()
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
// Package restic wraps the restic CLI: locating the binary, invoking
|
||||
// it with --json, and parsing the streamed event payloads.
|
||||
package restic
|
||||
@@ -0,0 +1,216 @@
|
||||
// Package restic wraps the restic CLI: locate the binary, run it
|
||||
// with --json, parse streamed events. The agent calls this; the
|
||||
// control-plane never invokes restic.
|
||||
package restic
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Locate resolves the path to the restic binary. Honour an explicit
|
||||
// override if provided, else fall back to PATH.
|
||||
func Locate(override string) (string, error) {
|
||||
if override != "" {
|
||||
if _, err := exec.LookPath(override); err == nil {
|
||||
return override, nil
|
||||
}
|
||||
return "", fmt.Errorf("restic: configured path %q not executable", override)
|
||||
}
|
||||
bin, err := exec.LookPath("restic")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("restic: not on PATH: %w", err)
|
||||
}
|
||||
return bin, nil
|
||||
}
|
||||
|
||||
// Env is the per-invocation context for a restic command.
|
||||
type Env struct {
|
||||
Bin string // path to restic binary
|
||||
RepoURL string // RESTIC_REPOSITORY
|
||||
RepoPassword string // RESTIC_PASSWORD (passed via env, never argv)
|
||||
ExtraEnv map[string]string // any other RESTIC_* / passthrough
|
||||
WorkDir string // CWD; default = current
|
||||
}
|
||||
|
||||
// EventKind enumerates what we care about in restic's --json output
|
||||
// for `backup`. Restic's other commands emit different shapes; we
|
||||
// switch on message_type.
|
||||
type EventKind string
|
||||
|
||||
const (
|
||||
EventStatus EventKind = "status" // periodic progress
|
||||
EventVerbose EventKind = "verbose_status"
|
||||
EventSummary EventKind = "summary" // emitted once at end of backup
|
||||
EventErrorEvent EventKind = "error"
|
||||
)
|
||||
|
||||
// BackupStatus mirrors the JSON status emitted by `restic backup`.
|
||||
type BackupStatus struct {
|
||||
MessageType string `json:"message_type"`
|
||||
PercentDone float64 `json:"percent_done"`
|
||||
TotalFiles int64 `json:"total_files"`
|
||||
FilesDone int64 `json:"files_done"`
|
||||
TotalBytes int64 `json:"total_bytes"`
|
||||
BytesDone int64 `json:"bytes_done"`
|
||||
SecondsElapsed int64 `json:"seconds_elapsed"`
|
||||
SecondsRem int64 `json:"seconds_remaining"`
|
||||
}
|
||||
|
||||
// BackupSummary mirrors the JSON summary block.
|
||||
type BackupSummary struct {
|
||||
MessageType string `json:"message_type"`
|
||||
FilesNew int64 `json:"files_new"`
|
||||
FilesChanged int64 `json:"files_changed"`
|
||||
FilesUnmodified int64 `json:"files_unmodified"`
|
||||
DirsNew int64 `json:"dirs_new"`
|
||||
DirsChanged int64 `json:"dirs_changed"`
|
||||
DirsUnmodified int64 `json:"dirs_unmodified"`
|
||||
DataAdded int64 `json:"data_added"`
|
||||
TotalFilesProcessed int64 `json:"total_files_processed"`
|
||||
TotalBytesProcessed int64 `json:"total_bytes_processed"`
|
||||
TotalDuration float64 `json:"total_duration"`
|
||||
SnapshotID string `json:"snapshot_id"`
|
||||
}
|
||||
|
||||
// LineHandler receives every stdout/stderr line. event is non-nil
|
||||
// when the line is a recognised JSON status; raw always carries the
|
||||
// original text (so we can also tee to job_logs as `stdout`).
|
||||
type LineHandler func(stream string, raw string, event any)
|
||||
|
||||
// RunBackup executes `restic backup [paths...]` with --json and pumps
|
||||
// status/summary into handle. Returns nil on success (exit code 0
|
||||
// or 3 — 3 means "completed but had issues"; restic considers it a
|
||||
// success). Other exit codes propagate as an error.
|
||||
func (e Env) RunBackup(ctx context.Context, paths, excludes, tags []string, handle LineHandler) (*BackupSummary, error) {
|
||||
args := []string{"backup", "--json"}
|
||||
for _, ex := range excludes {
|
||||
args = append(args, "--exclude", ex)
|
||||
}
|
||||
for _, tag := range tags {
|
||||
args = append(args, "--tag", tag)
|
||||
}
|
||||
args = append(args, paths...)
|
||||
|
||||
cmd := exec.CommandContext(ctx, e.Bin, args...)
|
||||
cmd.Env = e.envSlice()
|
||||
cmd.Dir = e.WorkDir
|
||||
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("restic backup: stdout pipe: %w", err)
|
||||
}
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("restic backup: stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("restic backup: start: %w", err)
|
||||
}
|
||||
|
||||
var summary *BackupSummary
|
||||
done := make(chan error, 2)
|
||||
go func() { done <- pumpStdout(stdout, handle, &summary) }()
|
||||
go func() { done <- pumpStderr(stderr, handle) }()
|
||||
|
||||
// Wait for both pumps + the process.
|
||||
for i := 0; i < 2; i++ {
|
||||
if err := <-done; err != nil && handle != nil {
|
||||
handle("event", fmt.Sprintf("pump error: %v", err), nil)
|
||||
}
|
||||
}
|
||||
werr := cmd.Wait()
|
||||
if werr != nil {
|
||||
var ee *exec.ExitError
|
||||
if errors.As(werr, &ee) && ee.ExitCode() == 3 {
|
||||
// "incomplete backup" — restic still produced a snapshot.
|
||||
return summary, nil
|
||||
}
|
||||
return summary, fmt.Errorf("restic backup: %w", werr)
|
||||
}
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
// envSlice converts Env's typed fields into the os/exec env shape.
|
||||
func (e Env) envSlice() []string {
|
||||
out := []string{
|
||||
"RESTIC_REPOSITORY=" + e.RepoURL,
|
||||
"RESTIC_PASSWORD=" + e.RepoPassword,
|
||||
// Feed restic via env-only — keeps creds off ps(1).
|
||||
"PATH=/usr/local/bin:/usr/bin:/bin",
|
||||
}
|
||||
for k, v := range e.ExtraEnv {
|
||||
out = append(out, k+"="+v)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func pumpStdout(r io.Reader, handle LineHandler, summary **BackupSummary) error {
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 4*1024*1024) // status lines can get long
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if handle == nil {
|
||||
continue
|
||||
}
|
||||
// Sniff message_type without a full Unmarshal so non-JSON
|
||||
// lines (very rare on stdout, but possible) survive.
|
||||
if !strings.HasPrefix(line, "{") {
|
||||
handle("stdout", line, nil)
|
||||
continue
|
||||
}
|
||||
var probe struct {
|
||||
MessageType string `json:"message_type"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(line), &probe); err != nil {
|
||||
handle("stdout", line, nil)
|
||||
continue
|
||||
}
|
||||
switch EventKind(probe.MessageType) {
|
||||
case EventStatus, EventVerbose:
|
||||
var ev BackupStatus
|
||||
if json.Unmarshal([]byte(line), &ev) == nil {
|
||||
handle("event", line, ev)
|
||||
continue
|
||||
}
|
||||
case EventSummary:
|
||||
var ev BackupSummary
|
||||
if json.Unmarshal([]byte(line), &ev) == nil {
|
||||
if summary != nil {
|
||||
s := ev
|
||||
*summary = &s
|
||||
}
|
||||
handle("event", line, ev)
|
||||
continue
|
||||
}
|
||||
case EventErrorEvent:
|
||||
handle("event", line, nil)
|
||||
continue
|
||||
}
|
||||
handle("stdout", line, nil)
|
||||
}
|
||||
return scanner.Err()
|
||||
}
|
||||
|
||||
func pumpStderr(r io.Reader, handle LineHandler) error {
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
||||
for scanner.Scan() {
|
||||
if handle != nil {
|
||||
handle("stderr", scanner.Text(), nil)
|
||||
}
|
||||
}
|
||||
return scanner.Err()
|
||||
}
|
||||
|
||||
// suppress unused-time false-positive when nothing else in this file
|
||||
// uses time but the file is part of a package that grows over time
|
||||
var _ = time.Now
|
||||
@@ -0,0 +1,134 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
stdhttp "net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/oklog/ulid/v2"
|
||||
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/auth"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||
)
|
||||
|
||||
// runNowRequest is the body of POST /api/hosts/:id/jobs.
|
||||
type runNowRequest struct {
|
||||
Kind api.JobKind `json:"kind"`
|
||||
Args []string `json:"args,omitempty"` // restic CLI args (paths for backup, etc.)
|
||||
}
|
||||
|
||||
type runNowResponse struct {
|
||||
JobID string `json:"job_id"`
|
||||
Status string `json:"status"` // "queued"
|
||||
}
|
||||
|
||||
// handleRunNow dispatches a job to the named host. Authenticated;
|
||||
// rejects if the host isn't connected (caller should retry once
|
||||
// the agent comes back).
|
||||
func (s *Server) handleRunNow(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
||||
user, ok := s.requireUser(r)
|
||||
if !ok {
|
||||
writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "")
|
||||
return
|
||||
}
|
||||
|
||||
hostID := chi.URLParam(r, "id")
|
||||
if hostID == "" {
|
||||
writeJSONError(w, stdhttp.StatusBadRequest, "missing_host_id", "")
|
||||
return
|
||||
}
|
||||
|
||||
var req runNowRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error())
|
||||
return
|
||||
}
|
||||
if !validJobKind(req.Kind) {
|
||||
writeJSONError(w, stdhttp.StatusBadRequest, "invalid_kind",
|
||||
"kind must be one of backup|forget|prune|check|unlock")
|
||||
return
|
||||
}
|
||||
|
||||
host, err := s.deps.Store.GetHost(r.Context(), hostID)
|
||||
if err != nil {
|
||||
writeJSONError(w, stdhttp.StatusNotFound, "host_not_found", "")
|
||||
return
|
||||
}
|
||||
|
||||
if !s.deps.Hub.Connected(host.ID) {
|
||||
writeJSONError(w, stdhttp.StatusServiceUnavailable, "host_offline",
|
||||
"agent is not currently connected; try again when it reconnects")
|
||||
return
|
||||
}
|
||||
|
||||
jobID := ulid.Make().String()
|
||||
now := time.Now().UTC()
|
||||
if err := s.deps.Store.CreateJob(r.Context(), store.Job{
|
||||
ID: jobID,
|
||||
HostID: host.ID,
|
||||
Kind: string(req.Kind),
|
||||
ActorKind: "user",
|
||||
ActorID: &user.ID,
|
||||
CreatedAt: now,
|
||||
}); err != nil {
|
||||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "")
|
||||
return
|
||||
}
|
||||
|
||||
env, err := api.Marshal(api.MsgCommandRun, jobID, api.CommandRunPayload{
|
||||
JobID: jobID,
|
||||
Kind: req.Kind,
|
||||
Args: req.Args,
|
||||
})
|
||||
if err != nil {
|
||||
writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "")
|
||||
return
|
||||
}
|
||||
if err := s.deps.Hub.Send(r.Context(), host.ID, env); err != nil {
|
||||
writeJSONError(w, stdhttp.StatusServiceUnavailable, "host_offline", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
_ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{
|
||||
ID: ulid.Make().String(),
|
||||
UserID: &user.ID,
|
||||
Actor: "user",
|
||||
Action: "job.run_now",
|
||||
TargetKind: ptr("job"),
|
||||
TargetID: &jobID,
|
||||
TS: now,
|
||||
})
|
||||
|
||||
writeJSON(w, stdhttp.StatusAccepted, runNowResponse{
|
||||
JobID: jobID,
|
||||
Status: "queued",
|
||||
})
|
||||
}
|
||||
|
||||
// requireUser resolves the session cookie to a user row. Stub of the
|
||||
// session-auth middleware that lands in P1-04's full pass.
|
||||
func (s *Server) requireUser(r *stdhttp.Request) (*store.User, bool) {
|
||||
c, err := r.Cookie(sessionCookieName)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
sess, err := s.deps.Store.LookupSession(r.Context(), auth.HashToken(c.Value))
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
u, err := s.deps.Store.GetUserByID(r.Context(), sess.UserID)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
return u, true
|
||||
}
|
||||
|
||||
func validJobKind(k api.JobKind) bool {
|
||||
switch k {
|
||||
case api.JobBackup, api.JobForget, api.JobPrune, api.JobCheck, api.JobUnlock:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -83,6 +83,9 @@ func (s *Server) routes(r chi.Router) {
|
||||
// /hosts/{id}/enrollment-token (regenerate) lands when the
|
||||
// host page can call it; for now just the create endpoint.
|
||||
r.Post("/enrollment-tokens", s.handleCreateEnrollmentToken)
|
||||
|
||||
// Run-now: dispatch a job to a host's agent.
|
||||
r.Post("/hosts/{id}/jobs", s.handleRunNow)
|
||||
})
|
||||
|
||||
// Agent ↔ server WebSocket. Bearer-authenticated inside the handler.
|
||||
|
||||
@@ -149,18 +149,44 @@ func runAgentLoop(ctx context.Context, c *Conn, hostID string, deps HandlerDeps)
|
||||
}
|
||||
}
|
||||
|
||||
// dispatchAgentMessage routes a single envelope to its handler. Only
|
||||
// hello + heartbeat are wired up in Phase 1's first slice; the rest
|
||||
// land with P1-18+ (jobs) and P2 (schedules).
|
||||
// dispatchAgentMessage routes a single envelope to its handler.
|
||||
func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.Envelope, deps HandlerDeps) {
|
||||
switch env.Type {
|
||||
case api.MsgHeartbeat:
|
||||
_ = deps.Store.TouchHost(ctx, hostID, time.Now().UTC())
|
||||
|
||||
case api.MsgJobStarted, api.MsgJobProgress, api.MsgJobFinished,
|
||||
api.MsgLogStream, api.MsgSnapshotsRpt, api.MsgRepoStats,
|
||||
api.MsgScheduleAck, api.MsgCommandResult:
|
||||
// TODO(P1-18+): persist + fan out to subscribed browsers.
|
||||
case api.MsgJobStarted:
|
||||
var p api.JobStartedPayload
|
||||
_ = env.UnmarshalPayload(&p)
|
||||
if err := deps.Store.MarkJobStarted(ctx, p.JobID, p.StartedAt); err != nil {
|
||||
slog.Warn("ws: mark job started", "job_id", p.JobID, "err", err)
|
||||
}
|
||||
|
||||
case api.MsgJobProgress:
|
||||
// We don't persist every progress tick; the live UI subscribes
|
||||
// to a fan-out channel that lands with P1-21 / the UI work.
|
||||
// TODO: implement the ws fan-out hub for browsers.
|
||||
_ = env
|
||||
|
||||
case api.MsgJobFinished:
|
||||
var p api.JobFinishedPayload
|
||||
_ = env.UnmarshalPayload(&p)
|
||||
errMsg := p.Error
|
||||
if err := deps.Store.MarkJobFinished(ctx, p.JobID,
|
||||
string(p.Status), p.ExitCode, p.Stats, errMsg, p.FinishedAt); err != nil {
|
||||
slog.Warn("ws: mark job finished", "job_id", p.JobID, "err", err)
|
||||
}
|
||||
|
||||
case api.MsgLogStream:
|
||||
var p api.LogStreamLine
|
||||
_ = env.UnmarshalPayload(&p)
|
||||
if err := deps.Store.AppendJobLog(ctx, p.JobID, p.Seq, p.TS,
|
||||
string(p.Stream), p.Payload); err != nil {
|
||||
slog.Warn("ws: append job log", "job_id", p.JobID, "err", err)
|
||||
}
|
||||
|
||||
case api.MsgSnapshotsRpt, api.MsgRepoStats, api.MsgScheduleAck, api.MsgCommandResult:
|
||||
// TODO(P1-22 + P2): persist these projections.
|
||||
slog.Debug("ws msg not yet handled", "type", env.Type, "host_id", hostID)
|
||||
|
||||
case api.MsgError:
|
||||
|
||||
@@ -0,0 +1,156 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Job mirrors the jobs table.
|
||||
type Job struct {
|
||||
ID string
|
||||
HostID string
|
||||
Kind string
|
||||
Status string
|
||||
ScheduledID *string
|
||||
ActorKind string // user|schedule|system
|
||||
ActorID *string
|
||||
StartedAt *time.Time
|
||||
FinishedAt *time.Time
|
||||
ExitCode *int
|
||||
Stats json.RawMessage
|
||||
Error *string
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// CreateJob inserts a queued job. The agent will mark it running
|
||||
// when it actually starts work.
|
||||
func (s *Store) CreateJob(ctx context.Context, j Job) error {
|
||||
_, err := s.db.ExecContext(ctx,
|
||||
`INSERT INTO jobs (id, host_id, kind, status, actor_kind, actor_id, created_at)
|
||||
VALUES (?, ?, ?, 'queued', ?, ?, ?)`,
|
||||
j.ID, j.HostID, j.Kind, j.ActorKind, nullable(j.ActorID),
|
||||
j.CreatedAt.UTC().Format(time.RFC3339Nano))
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: create job: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarkJobStarted flips status to 'running' and records started_at.
|
||||
func (s *Store) MarkJobStarted(ctx context.Context, id string, when time.Time) error {
|
||||
res, err := s.db.ExecContext(ctx,
|
||||
`UPDATE jobs
|
||||
SET status = 'running', started_at = ?
|
||||
WHERE id = ? AND status IN ('queued','running')`,
|
||||
when.UTC().Format(time.RFC3339Nano), id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: mark started: %w", err)
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
if n == 0 {
|
||||
return ErrNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarkJobFinished records the terminal state.
|
||||
func (s *Store) MarkJobFinished(ctx context.Context, id, status string, exitCode int, stats json.RawMessage, errMsg string, when time.Time) error {
|
||||
if len(stats) == 0 {
|
||||
stats = json.RawMessage("null")
|
||||
}
|
||||
res, err := s.db.ExecContext(ctx,
|
||||
`UPDATE jobs
|
||||
SET status = ?, finished_at = ?, exit_code = ?, stats = ?, error = ?
|
||||
WHERE id = ?`,
|
||||
status,
|
||||
when.UTC().Format(time.RFC3339Nano),
|
||||
exitCode, string(stats), nullableStr(errMsg), id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: mark finished: %w", err)
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
if n == 0 {
|
||||
return ErrNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AppendJobLog records one line of agent output. seq is the agent's
|
||||
// monotonic sequence number; gaps imply lost data.
|
||||
func (s *Store) AppendJobLog(ctx context.Context, jobID string, seq int64, ts time.Time, stream, payload string) error {
|
||||
_, err := s.db.ExecContext(ctx,
|
||||
`INSERT INTO job_logs (job_id, seq, ts, stream, payload) VALUES (?,?,?,?,?)`,
|
||||
jobID, seq, ts.UTC().Format(time.RFC3339Nano), stream, payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("store: append job log: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetJob returns a job row.
|
||||
func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) {
|
||||
row := s.db.QueryRowContext(ctx,
|
||||
`SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id,
|
||||
started_at, finished_at, exit_code, stats, error, created_at
|
||||
FROM jobs WHERE id = ?`, id)
|
||||
var (
|
||||
j Job
|
||||
schedID sql.NullString
|
||||
actorID sql.NullString
|
||||
startedAt sql.NullString
|
||||
finishedAt sql.NullString
|
||||
exitCode sql.NullInt64
|
||||
stats sql.NullString
|
||||
errMsg sql.NullString
|
||||
createdAt string
|
||||
)
|
||||
if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID,
|
||||
&j.ActorKind, &actorID, &startedAt, &finishedAt,
|
||||
&exitCode, &stats, &errMsg, &createdAt); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, fmt.Errorf("store: scan job: %w", err)
|
||||
}
|
||||
if schedID.Valid {
|
||||
s := schedID.String
|
||||
j.ScheduledID = &s
|
||||
}
|
||||
if actorID.Valid {
|
||||
s := actorID.String
|
||||
j.ActorID = &s
|
||||
}
|
||||
if startedAt.Valid {
|
||||
t, _ := time.Parse(time.RFC3339Nano, startedAt.String)
|
||||
j.StartedAt = &t
|
||||
}
|
||||
if finishedAt.Valid {
|
||||
t, _ := time.Parse(time.RFC3339Nano, finishedAt.String)
|
||||
j.FinishedAt = &t
|
||||
}
|
||||
if exitCode.Valid {
|
||||
i := int(exitCode.Int64)
|
||||
j.ExitCode = &i
|
||||
}
|
||||
if stats.Valid && stats.String != "" {
|
||||
j.Stats = json.RawMessage(stats.String)
|
||||
}
|
||||
if errMsg.Valid {
|
||||
s := errMsg.String
|
||||
j.Error = &s
|
||||
}
|
||||
t, _ := time.Parse(time.RFC3339Nano, createdAt)
|
||||
j.CreatedAt = t
|
||||
return &j, nil
|
||||
}
|
||||
|
||||
func nullableStr(s string) any {
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
return s
|
||||
}
|
||||
Reference in New Issue
Block a user