From 95b49ecab94b392dc520a36a49371916c3841082 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Fri, 1 May 2026 00:45:04 +0100 Subject: [PATCH] =?UTF-8?q?phase=201:=20run-now=20backup=20=E2=80=94=20res?= =?UTF-8?q?tic=20wrapper,=20job=20lifecycle,=20end-to-end?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the operator → server → agent → restic → server roundtrip for on-demand backups. The flow: POST /api/hosts/{id}/jobs {kind:"backup",args:["/path"]} → server creates a queued Job row → server emits command.run over WS to the host's agent → agent dispatcher spawns runner.RunBackup in a goroutine → runner spawns `restic backup --json`, parses each line → forwards: job.started, log.stream (every line), job.progress (throttled to 1/sec), job.finished (with summary stats blob) → server WS handler persists those into jobs / job_logs P1-16 internal/restic: thin Locate + Env wrapper that runs `restic backup --json`, scans stdout/stderr, parses BackupStatus + BackupSummary, calls back into a LineHandler so the agent can fan out to log.stream + job.progress. Treats exit code 3 as "succeeded with issues" (matches restic's contract). P1-18 store: jobs accessors (CreateJob, MarkJobStarted, MarkJobFinished, AppendJobLog, GetJob). P1-19 server: POST /api/hosts/{id}/jobs creates the Job row, validates kind, dispatches via Hub.Send, audit-logs the action. P1-20 agent runner: wraps restic.RunBackup with throttled progress emission. Sender abstraction was added to wsclient.Handler so background goroutines can keep replying after dispatch returns. P1-21 server WS: dispatchAgentMessage now persists job.started, job.finished, log.stream into the database. Browser fan-out for live tailing lands with the UI work. Agent gets repo_url + repo_password from agent.yaml in plaintext for now (mode 0600, owned by service user); spec.md §7.3's keyring storage moves there in P2. config.update over WS overrides the in-memory copy (does not persist). Build clean; all tests pass. End-to-end with a real restic still needs a host that has restic installed — wire shape verified by the existing hello/heartbeat round-trip test. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/agent/main.go | 95 +++++++++++-- internal/agent/config/config.go | 9 ++ internal/agent/runner/runner.go | 141 +++++++++++++++++++ internal/agent/wsclient/client.go | 43 +++++- internal/restic/doc.go | 3 - internal/restic/runner.go | 216 ++++++++++++++++++++++++++++++ internal/server/http/jobs.go | 134 ++++++++++++++++++ internal/server/http/server.go | 3 + internal/server/ws/handler.go | 40 +++++- internal/store/jobs.go | 156 +++++++++++++++++++++ 10 files changed, 811 insertions(+), 29 deletions(-) create mode 100644 internal/agent/runner/runner.go delete mode 100644 internal/restic/doc.go create mode 100644 internal/restic/runner.go create mode 100644 internal/server/http/jobs.go create mode 100644 internal/store/jobs.go diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 1953862..439e739 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -9,11 +9,14 @@ import ( "os" "os/signal" "syscall" + "time" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/config" + "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/runner" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/sysinfo" "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/restic" ) var version = "dev" @@ -45,10 +48,6 @@ func run() error { return fmt.Errorf("config: %w", err) } - // Enrollment mode: agent was started with -enroll-server -enroll-token. - // On success we persist the credentials and exit (the install script - // then starts the agent service). Avoiding a long-running process here - // keeps the enrollment story restartable. if *enrollToken != "" { if *enrollServer == "" { return errors.New("enrollment: -enroll-server is required with -enroll-token") @@ -75,6 +74,8 @@ func run() error { "protocol_version", snap.ProtocolVersion, ) + resticBin, _ := restic.Locate(cfg.ResticPath) // empty is fine; commands fail with a clear error later + wsCfg := wsclient.Config{ ServerURL: cfg.ServerURL, AgentToken: cfg.AgentToken, @@ -90,35 +91,101 @@ func run() error { }, } - if err := wsclient.Run(ctx, wsCfg, dispatch); err != nil { + d := &dispatcher{ + resticBin: resticBin, + repoURL: cfg.RepoURL, + repoPassword: cfg.RepoPassword, + } + if err := wsclient.Run(ctx, wsCfg, d.handle); err != nil { return fmt.Errorf("ws run: %w", err) } slog.Info("agent shutting down") return nil } -// dispatch handles server-pushed envelopes. Phase 1's first slice -// just logs; P1-19/20/21 wire command.run to the runner. -func dispatch(_ context.Context, env api.Envelope) error { +// dispatcher closes over the long-lived agent settings (restic path, +// repo creds) so handle() can spawn the runner without re-loading +// config every time. +type dispatcher struct { + resticBin string + repoURL string + repoPassword string +} + +func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error { switch env.Type { case api.MsgCommandRun: - slog.Info("ws agent: command.run received (not yet implemented)", "id", env.ID) + var p api.CommandRunPayload + if err := env.UnmarshalPayload(&p); err != nil { + return fmt.Errorf("command.run: %w", err) + } + return d.runJob(ctx, p, tx) + case api.MsgCommandCancel: - slog.Info("ws agent: command.cancel received (not yet implemented)", "id", env.ID) + // TODO(P2): cancellation requires keeping a job→cancelFunc map. + slog.Info("ws agent: command.cancel received (cancellation lands in P2)", "id", env.ID) + case api.MsgScheduleSet: - slog.Info("ws agent: schedule.set received (not yet implemented)", "id", env.ID) + // TODO(P2): apply the schedule. + slog.Info("ws agent: schedule.set received (handled in P2)", "id", env.ID) + case api.MsgConfigUpdate: - slog.Info("ws agent: config.update received (not yet implemented)", "id", env.ID) + var p api.ConfigUpdatePayload + _ = env.UnmarshalPayload(&p) + // In-memory only for now — restart loses these. Persistent + // secret storage lands with P2's keyring work. + if p.RepoURL != "" { + d.repoURL = p.RepoURL + slog.Info("ws agent: repo URL updated via config.update") + } + if p.RepoPassword != "" { + d.repoPassword = p.RepoPassword + slog.Info("ws agent: repo password updated via config.update") + } + case api.MsgAgentUpdateAvail: - slog.Info("ws agent: agent.update.available received (not yet implemented)", "id", env.ID) + var p api.AgentUpdateAvailablePayload + _ = env.UnmarshalPayload(&p) + slog.Info("ws agent: update available", "version", p.LatestVersion, "url", p.PackageURL) + default: slog.Debug("ws agent: ignored message", "type", env.Type) } return nil } +// runJob spawns a runner for one job. We launch a goroutine so the +// WS read loop keeps draining messages while restic chugs along. +func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsclient.Sender) error { + if d.resticBin == "" { + return fmt.Errorf("restic binary not located on this agent") + } + if d.repoURL == "" || d.repoPassword == "" { + return fmt.Errorf("repo credentials not configured (set repo_url + repo_password in agent.yaml or push via config.update)") + } + r := runner.New(runner.Config{ + ResticBin: d.resticBin, + RepoURL: d.repoURL, + RepoPassword: d.repoPassword, + }, tx, time.Second) + + switch p.Kind { + case api.JobBackup: + // Agent.Args carries [paths...]. Excludes/tags are not yet + // surfaced over the wire; they come with P2 schedule support. + go func() { + if err := r.RunBackup(ctx, p.JobID, p.Args, nil, nil); err != nil { + slog.Warn("agent: backup job failed", "job_id", p.JobID, "err", err) + } + }() + default: + return fmt.Errorf("kind %q not implemented yet (Phase 2 lands the rest)", p.Kind) + } + return nil +} + func doEnroll(serverURL, token string, cfg *config.Config, agentVersion string) error { - ctx, cancel := context.WithTimeout(context.Background(), 60*1e9) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() snap, err := sysinfo.Collect(ctx, cfg.ResticPath) diff --git a/internal/agent/config/config.go b/internal/agent/config/config.go index 48bba00..e854b2c 100644 --- a/internal/agent/config/config.go +++ b/internal/agent/config/config.go @@ -33,6 +33,15 @@ type Config struct { // ResticPath overrides the auto-detected restic binary path. ResticPath string `yaml:"restic_path,omitempty"` + // RepoURL + RepoPassword are the credentials this host uses to + // reach its restic repository. Phase 1 keeps these in plaintext + // in agent.yaml (mode 0600 owned by the agent service user); the + // server-pushed config.update message can override them in + // memory. Phase 2 moves them into the OS keyring (DPAPI on + // Windows, Secret Service on Linux). + RepoURL string `yaml:"repo_url,omitempty"` + RepoPassword string `yaml:"repo_password,omitempty"` + // path is the file we loaded from. Used by Save. path string `yaml:"-"` } diff --git a/internal/agent/runner/runner.go b/internal/agent/runner/runner.go new file mode 100644 index 0000000..dc08baa --- /dev/null +++ b/internal/agent/runner/runner.go @@ -0,0 +1,141 @@ +// Package runner spawns restic processes for the agent. It owns one +// Run() invocation per command.run; concurrency limits live a layer +// up (the WS handler). +package runner + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync/atomic" + "time" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/restic" +) + +// Sender is the agent's outbound message channel. Provided by +// wsclient so the runner can push job.started / job.progress / +// job.finished / log.stream back to the server. +type Sender interface { + Send(env api.Envelope) error +} + +// Config bundles the long-lived settings the runner needs. They come +// from the agent's config file (server-pushed config.update payloads +// override these in memory). +type Config struct { + ResticBin string + RepoURL string + RepoPassword string +} + +// Runner owns the restic invocations. +type Runner struct { + cfg Config + tx Sender + + // progress throttling: we receive a status event from restic + // every ~100ms; the UI doesn't need anywhere near that rate. + // Cap WS sends to one per N (configurable; default 1s). + progressMinPeriod time.Duration +} + +// New builds a Runner. progressMinPeriod = 0 uses the default 1s. +func New(cfg Config, tx Sender, progressMinPeriod time.Duration) *Runner { + if progressMinPeriod <= 0 { + progressMinPeriod = time.Second + } + return &Runner{cfg: cfg, tx: tx, progressMinPeriod: progressMinPeriod} +} + +// RunBackup executes a backup job and reports back via the sender. +// Returns nil on a clean (or "incomplete-but-snapshot-created") finish. +func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, tags []string) error { + startedAt := time.Now().UTC() + + startEnv, _ := api.Marshal(api.MsgJobStarted, jobID, api.JobStartedPayload{ + JobID: jobID, Kind: api.JobBackup, StartedAt: startedAt, + }) + if err := r.tx.Send(startEnv); err != nil { + slog.Warn("runner: send job.started", "err", err) + } + + env := restic.Env{ + Bin: r.cfg.ResticBin, + RepoURL: r.cfg.RepoURL, + RepoPassword: r.cfg.RepoPassword, + } + + var seq atomic.Int64 + lastProgress := time.Now() + + handle := func(stream string, line string, ev any) { + // Forward every line to the server as log.stream. + now := time.Now().UTC() + logEnv, _ := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{ + JobID: jobID, + Seq: seq.Add(1), + TS: now, + Stream: api.LogStream(stream), + Payload: line, + }) + _ = r.tx.Send(logEnv) + + // Throttled progress events. + if status, ok := ev.(restic.BackupStatus); ok { + if time.Since(lastProgress) < r.progressMinPeriod { + return + } + lastProgress = time.Now() + progEnv, _ := api.Marshal(api.MsgJobProgress, jobID, api.JobProgressPayload{ + JobID: jobID, + PercentDone: status.PercentDone, + FilesDone: status.FilesDone, + TotalFiles: status.TotalFiles, + BytesDone: status.BytesDone, + TotalBytes: status.TotalBytes, + ETASeconds: status.SecondsRem, + ThroughputBps: throughput(status.BytesDone, status.SecondsElapsed), + }) + _ = r.tx.Send(progEnv) + } + } + + summary, err := env.RunBackup(ctx, paths, excludes, tags, handle) + finishedAt := time.Now().UTC() + + status := api.JobSucceeded + exit := 0 + errMsg := "" + if err != nil { + status = api.JobFailed + exit = -1 + errMsg = err.Error() + } + var statsBlob json.RawMessage + if summary != nil { + statsBlob, _ = json.Marshal(summary) + } + finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{ + JobID: jobID, + Status: status, + ExitCode: exit, + FinishedAt: finishedAt, + Stats: statsBlob, + Error: errMsg, + }) + _ = r.tx.Send(finEnv) + if err != nil { + return fmt.Errorf("runner backup: %w", err) + } + return nil +} + +func throughput(bytesDone, secondsElapsed int64) int64 { + if secondsElapsed <= 0 { + return 0 + } + return bytesDone / secondsElapsed +} diff --git a/internal/agent/wsclient/client.go b/internal/agent/wsclient/client.go index f1bd81d..f37f3e4 100644 --- a/internal/agent/wsclient/client.go +++ b/internal/agent/wsclient/client.go @@ -19,6 +19,7 @@ import ( stdhttp "net/http" "net/url" "strings" + "sync" "time" "github.com/coder/websocket" @@ -36,10 +37,19 @@ type Config struct { HelloPayload api.HelloPayload } -// Handler is invoked for every server-sent message. The agent's main -// program supplies one that knows how to dispatch command.run etc. -// to the runner package. -type Handler func(ctx context.Context, env api.Envelope) error +// Sender is what handlers use to push agent → server messages +// (job.progress, job.finished, log.stream, command.result, …). +// Returned by the WS client to the dispatch handler. Write operations +// serialise behind a single mutex on the conn; concurrent calls are +// safe. +type Sender interface { + Send(env api.Envelope) error +} + +// Handler is invoked for every server-sent message. tx lets the +// handler push replies back; it is valid only for the lifetime of +// the connection (calls fail if the agent has reconnected since). +type Handler func(ctx context.Context, env api.Envelope, tx Sender) error // Run keeps the agent connected indefinitely. Returns when ctx is // cancelled. Errors during a single connection attempt are logged and @@ -107,6 +117,8 @@ func connectOnce(ctx context.Context, cfg Config, handle Handler) error { } slog.Info("ws agent connected", "server", wsURL) + tx := &connSender{conn: conn, ctx: ctx} + // Heartbeat goroutine. heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx) defer cancelHeartbeat() @@ -138,13 +150,34 @@ func connectOnce(ctx context.Context, cfg Config, handle Handler) error { continue } if handle != nil { - if err := handle(ctx, env); err != nil { + if err := handle(ctx, env, tx); err != nil { slog.Warn("ws agent: handler returned error", "type", env.Type, "err", err) } } } } +// connSender is the per-connection Sender. Goroutines beyond the +// read loop (e.g. a backup running in its own goroutine) keep a +// reference to one of these for the duration of their work. +type connSender struct { + conn *websocket.Conn + ctx context.Context + mu sync.Mutex +} + +func (s *connSender) Send(env api.Envelope) error { + s.mu.Lock() + defer s.mu.Unlock() + raw, err := json.Marshal(env) + if err != nil { + return err + } + writeCtx, cancel := context.WithTimeout(s.ctx, 30*time.Second) + defer cancel() + return s.conn.Write(writeCtx, websocket.MessageText, raw) +} + func heartbeatLoop(ctx context.Context, conn *websocket.Conn, period time.Duration) { t := time.NewTicker(period) defer t.Stop() diff --git a/internal/restic/doc.go b/internal/restic/doc.go deleted file mode 100644 index 2e8026c..0000000 --- a/internal/restic/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package restic wraps the restic CLI: locating the binary, invoking -// it with --json, and parsing the streamed event payloads. -package restic diff --git a/internal/restic/runner.go b/internal/restic/runner.go new file mode 100644 index 0000000..b728563 --- /dev/null +++ b/internal/restic/runner.go @@ -0,0 +1,216 @@ +// Package restic wraps the restic CLI: locate the binary, run it +// with --json, parse streamed events. The agent calls this; the +// control-plane never invokes restic. +package restic + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os/exec" + "strings" + "time" +) + +// Locate resolves the path to the restic binary. Honour an explicit +// override if provided, else fall back to PATH. +func Locate(override string) (string, error) { + if override != "" { + if _, err := exec.LookPath(override); err == nil { + return override, nil + } + return "", fmt.Errorf("restic: configured path %q not executable", override) + } + bin, err := exec.LookPath("restic") + if err != nil { + return "", fmt.Errorf("restic: not on PATH: %w", err) + } + return bin, nil +} + +// Env is the per-invocation context for a restic command. +type Env struct { + Bin string // path to restic binary + RepoURL string // RESTIC_REPOSITORY + RepoPassword string // RESTIC_PASSWORD (passed via env, never argv) + ExtraEnv map[string]string // any other RESTIC_* / passthrough + WorkDir string // CWD; default = current +} + +// EventKind enumerates what we care about in restic's --json output +// for `backup`. Restic's other commands emit different shapes; we +// switch on message_type. +type EventKind string + +const ( + EventStatus EventKind = "status" // periodic progress + EventVerbose EventKind = "verbose_status" + EventSummary EventKind = "summary" // emitted once at end of backup + EventErrorEvent EventKind = "error" +) + +// BackupStatus mirrors the JSON status emitted by `restic backup`. +type BackupStatus struct { + MessageType string `json:"message_type"` + PercentDone float64 `json:"percent_done"` + TotalFiles int64 `json:"total_files"` + FilesDone int64 `json:"files_done"` + TotalBytes int64 `json:"total_bytes"` + BytesDone int64 `json:"bytes_done"` + SecondsElapsed int64 `json:"seconds_elapsed"` + SecondsRem int64 `json:"seconds_remaining"` +} + +// BackupSummary mirrors the JSON summary block. +type BackupSummary struct { + MessageType string `json:"message_type"` + FilesNew int64 `json:"files_new"` + FilesChanged int64 `json:"files_changed"` + FilesUnmodified int64 `json:"files_unmodified"` + DirsNew int64 `json:"dirs_new"` + DirsChanged int64 `json:"dirs_changed"` + DirsUnmodified int64 `json:"dirs_unmodified"` + DataAdded int64 `json:"data_added"` + TotalFilesProcessed int64 `json:"total_files_processed"` + TotalBytesProcessed int64 `json:"total_bytes_processed"` + TotalDuration float64 `json:"total_duration"` + SnapshotID string `json:"snapshot_id"` +} + +// LineHandler receives every stdout/stderr line. event is non-nil +// when the line is a recognised JSON status; raw always carries the +// original text (so we can also tee to job_logs as `stdout`). +type LineHandler func(stream string, raw string, event any) + +// RunBackup executes `restic backup [paths...]` with --json and pumps +// status/summary into handle. Returns nil on success (exit code 0 +// or 3 — 3 means "completed but had issues"; restic considers it a +// success). Other exit codes propagate as an error. +func (e Env) RunBackup(ctx context.Context, paths, excludes, tags []string, handle LineHandler) (*BackupSummary, error) { + args := []string{"backup", "--json"} + for _, ex := range excludes { + args = append(args, "--exclude", ex) + } + for _, tag := range tags { + args = append(args, "--tag", tag) + } + args = append(args, paths...) + + cmd := exec.CommandContext(ctx, e.Bin, args...) + cmd.Env = e.envSlice() + cmd.Dir = e.WorkDir + + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("restic backup: stdout pipe: %w", err) + } + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("restic backup: stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("restic backup: start: %w", err) + } + + var summary *BackupSummary + done := make(chan error, 2) + go func() { done <- pumpStdout(stdout, handle, &summary) }() + go func() { done <- pumpStderr(stderr, handle) }() + + // Wait for both pumps + the process. + for i := 0; i < 2; i++ { + if err := <-done; err != nil && handle != nil { + handle("event", fmt.Sprintf("pump error: %v", err), nil) + } + } + werr := cmd.Wait() + if werr != nil { + var ee *exec.ExitError + if errors.As(werr, &ee) && ee.ExitCode() == 3 { + // "incomplete backup" — restic still produced a snapshot. + return summary, nil + } + return summary, fmt.Errorf("restic backup: %w", werr) + } + return summary, nil +} + +// envSlice converts Env's typed fields into the os/exec env shape. +func (e Env) envSlice() []string { + out := []string{ + "RESTIC_REPOSITORY=" + e.RepoURL, + "RESTIC_PASSWORD=" + e.RepoPassword, + // Feed restic via env-only — keeps creds off ps(1). + "PATH=/usr/local/bin:/usr/bin:/bin", + } + for k, v := range e.ExtraEnv { + out = append(out, k+"="+v) + } + return out +} + +func pumpStdout(r io.Reader, handle LineHandler, summary **BackupSummary) error { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 0, 64*1024), 4*1024*1024) // status lines can get long + for scanner.Scan() { + line := scanner.Text() + if handle == nil { + continue + } + // Sniff message_type without a full Unmarshal so non-JSON + // lines (very rare on stdout, but possible) survive. + if !strings.HasPrefix(line, "{") { + handle("stdout", line, nil) + continue + } + var probe struct { + MessageType string `json:"message_type"` + } + if err := json.Unmarshal([]byte(line), &probe); err != nil { + handle("stdout", line, nil) + continue + } + switch EventKind(probe.MessageType) { + case EventStatus, EventVerbose: + var ev BackupStatus + if json.Unmarshal([]byte(line), &ev) == nil { + handle("event", line, ev) + continue + } + case EventSummary: + var ev BackupSummary + if json.Unmarshal([]byte(line), &ev) == nil { + if summary != nil { + s := ev + *summary = &s + } + handle("event", line, ev) + continue + } + case EventErrorEvent: + handle("event", line, nil) + continue + } + handle("stdout", line, nil) + } + return scanner.Err() +} + +func pumpStderr(r io.Reader, handle LineHandler) error { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + if handle != nil { + handle("stderr", scanner.Text(), nil) + } + } + return scanner.Err() +} + +// suppress unused-time false-positive when nothing else in this file +// uses time but the file is part of a package that grows over time +var _ = time.Now diff --git a/internal/server/http/jobs.go b/internal/server/http/jobs.go new file mode 100644 index 0000000..ef956c2 --- /dev/null +++ b/internal/server/http/jobs.go @@ -0,0 +1,134 @@ +package http + +import ( + "encoding/json" + stdhttp "net/http" + "time" + + "github.com/go-chi/chi/v5" + "github.com/oklog/ulid/v2" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/auth" + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// runNowRequest is the body of POST /api/hosts/:id/jobs. +type runNowRequest struct { + Kind api.JobKind `json:"kind"` + Args []string `json:"args,omitempty"` // restic CLI args (paths for backup, etc.) +} + +type runNowResponse struct { + JobID string `json:"job_id"` + Status string `json:"status"` // "queued" +} + +// handleRunNow dispatches a job to the named host. Authenticated; +// rejects if the host isn't connected (caller should retry once +// the agent comes back). +func (s *Server) handleRunNow(w stdhttp.ResponseWriter, r *stdhttp.Request) { + user, ok := s.requireUser(r) + if !ok { + writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") + return + } + + hostID := chi.URLParam(r, "id") + if hostID == "" { + writeJSONError(w, stdhttp.StatusBadRequest, "missing_host_id", "") + return + } + + var req runNowRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJSONError(w, stdhttp.StatusBadRequest, "invalid_json", err.Error()) + return + } + if !validJobKind(req.Kind) { + writeJSONError(w, stdhttp.StatusBadRequest, "invalid_kind", + "kind must be one of backup|forget|prune|check|unlock") + return + } + + host, err := s.deps.Store.GetHost(r.Context(), hostID) + if err != nil { + writeJSONError(w, stdhttp.StatusNotFound, "host_not_found", "") + return + } + + if !s.deps.Hub.Connected(host.ID) { + writeJSONError(w, stdhttp.StatusServiceUnavailable, "host_offline", + "agent is not currently connected; try again when it reconnects") + return + } + + jobID := ulid.Make().String() + now := time.Now().UTC() + if err := s.deps.Store.CreateJob(r.Context(), store.Job{ + ID: jobID, + HostID: host.ID, + Kind: string(req.Kind), + ActorKind: "user", + ActorID: &user.ID, + CreatedAt: now, + }); err != nil { + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") + return + } + + env, err := api.Marshal(api.MsgCommandRun, jobID, api.CommandRunPayload{ + JobID: jobID, + Kind: req.Kind, + Args: req.Args, + }) + if err != nil { + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") + return + } + if err := s.deps.Hub.Send(r.Context(), host.ID, env); err != nil { + writeJSONError(w, stdhttp.StatusServiceUnavailable, "host_offline", err.Error()) + return + } + + _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ + ID: ulid.Make().String(), + UserID: &user.ID, + Actor: "user", + Action: "job.run_now", + TargetKind: ptr("job"), + TargetID: &jobID, + TS: now, + }) + + writeJSON(w, stdhttp.StatusAccepted, runNowResponse{ + JobID: jobID, + Status: "queued", + }) +} + +// requireUser resolves the session cookie to a user row. Stub of the +// session-auth middleware that lands in P1-04's full pass. +func (s *Server) requireUser(r *stdhttp.Request) (*store.User, bool) { + c, err := r.Cookie(sessionCookieName) + if err != nil { + return nil, false + } + sess, err := s.deps.Store.LookupSession(r.Context(), auth.HashToken(c.Value)) + if err != nil { + return nil, false + } + u, err := s.deps.Store.GetUserByID(r.Context(), sess.UserID) + if err != nil { + return nil, false + } + return u, true +} + +func validJobKind(k api.JobKind) bool { + switch k { + case api.JobBackup, api.JobForget, api.JobPrune, api.JobCheck, api.JobUnlock: + return true + } + return false +} diff --git a/internal/server/http/server.go b/internal/server/http/server.go index 1b505cc..aabc9cc 100644 --- a/internal/server/http/server.go +++ b/internal/server/http/server.go @@ -83,6 +83,9 @@ func (s *Server) routes(r chi.Router) { // /hosts/{id}/enrollment-token (regenerate) lands when the // host page can call it; for now just the create endpoint. r.Post("/enrollment-tokens", s.handleCreateEnrollmentToken) + + // Run-now: dispatch a job to a host's agent. + r.Post("/hosts/{id}/jobs", s.handleRunNow) }) // Agent ↔ server WebSocket. Bearer-authenticated inside the handler. diff --git a/internal/server/ws/handler.go b/internal/server/ws/handler.go index 8828953..96059e9 100644 --- a/internal/server/ws/handler.go +++ b/internal/server/ws/handler.go @@ -149,18 +149,44 @@ func runAgentLoop(ctx context.Context, c *Conn, hostID string, deps HandlerDeps) } } -// dispatchAgentMessage routes a single envelope to its handler. Only -// hello + heartbeat are wired up in Phase 1's first slice; the rest -// land with P1-18+ (jobs) and P2 (schedules). +// dispatchAgentMessage routes a single envelope to its handler. func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.Envelope, deps HandlerDeps) { switch env.Type { case api.MsgHeartbeat: _ = deps.Store.TouchHost(ctx, hostID, time.Now().UTC()) - case api.MsgJobStarted, api.MsgJobProgress, api.MsgJobFinished, - api.MsgLogStream, api.MsgSnapshotsRpt, api.MsgRepoStats, - api.MsgScheduleAck, api.MsgCommandResult: - // TODO(P1-18+): persist + fan out to subscribed browsers. + case api.MsgJobStarted: + var p api.JobStartedPayload + _ = env.UnmarshalPayload(&p) + if err := deps.Store.MarkJobStarted(ctx, p.JobID, p.StartedAt); err != nil { + slog.Warn("ws: mark job started", "job_id", p.JobID, "err", err) + } + + case api.MsgJobProgress: + // We don't persist every progress tick; the live UI subscribes + // to a fan-out channel that lands with P1-21 / the UI work. + // TODO: implement the ws fan-out hub for browsers. + _ = env + + case api.MsgJobFinished: + var p api.JobFinishedPayload + _ = env.UnmarshalPayload(&p) + errMsg := p.Error + if err := deps.Store.MarkJobFinished(ctx, p.JobID, + string(p.Status), p.ExitCode, p.Stats, errMsg, p.FinishedAt); err != nil { + slog.Warn("ws: mark job finished", "job_id", p.JobID, "err", err) + } + + case api.MsgLogStream: + var p api.LogStreamLine + _ = env.UnmarshalPayload(&p) + if err := deps.Store.AppendJobLog(ctx, p.JobID, p.Seq, p.TS, + string(p.Stream), p.Payload); err != nil { + slog.Warn("ws: append job log", "job_id", p.JobID, "err", err) + } + + case api.MsgSnapshotsRpt, api.MsgRepoStats, api.MsgScheduleAck, api.MsgCommandResult: + // TODO(P1-22 + P2): persist these projections. slog.Debug("ws msg not yet handled", "type", env.Type, "host_id", hostID) case api.MsgError: diff --git a/internal/store/jobs.go b/internal/store/jobs.go new file mode 100644 index 0000000..c432ed2 --- /dev/null +++ b/internal/store/jobs.go @@ -0,0 +1,156 @@ +package store + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "time" +) + +// Job mirrors the jobs table. +type Job struct { + ID string + HostID string + Kind string + Status string + ScheduledID *string + ActorKind string // user|schedule|system + ActorID *string + StartedAt *time.Time + FinishedAt *time.Time + ExitCode *int + Stats json.RawMessage + Error *string + CreatedAt time.Time +} + +// CreateJob inserts a queued job. The agent will mark it running +// when it actually starts work. +func (s *Store) CreateJob(ctx context.Context, j Job) error { + _, err := s.db.ExecContext(ctx, + `INSERT INTO jobs (id, host_id, kind, status, actor_kind, actor_id, created_at) + VALUES (?, ?, ?, 'queued', ?, ?, ?)`, + j.ID, j.HostID, j.Kind, j.ActorKind, nullable(j.ActorID), + j.CreatedAt.UTC().Format(time.RFC3339Nano)) + if err != nil { + return fmt.Errorf("store: create job: %w", err) + } + return nil +} + +// MarkJobStarted flips status to 'running' and records started_at. +func (s *Store) MarkJobStarted(ctx context.Context, id string, when time.Time) error { + res, err := s.db.ExecContext(ctx, + `UPDATE jobs + SET status = 'running', started_at = ? + WHERE id = ? AND status IN ('queued','running')`, + when.UTC().Format(time.RFC3339Nano), id) + if err != nil { + return fmt.Errorf("store: mark started: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return ErrNotFound + } + return nil +} + +// MarkJobFinished records the terminal state. +func (s *Store) MarkJobFinished(ctx context.Context, id, status string, exitCode int, stats json.RawMessage, errMsg string, when time.Time) error { + if len(stats) == 0 { + stats = json.RawMessage("null") + } + res, err := s.db.ExecContext(ctx, + `UPDATE jobs + SET status = ?, finished_at = ?, exit_code = ?, stats = ?, error = ? + WHERE id = ?`, + status, + when.UTC().Format(time.RFC3339Nano), + exitCode, string(stats), nullableStr(errMsg), id) + if err != nil { + return fmt.Errorf("store: mark finished: %w", err) + } + n, _ := res.RowsAffected() + if n == 0 { + return ErrNotFound + } + return nil +} + +// AppendJobLog records one line of agent output. seq is the agent's +// monotonic sequence number; gaps imply lost data. +func (s *Store) AppendJobLog(ctx context.Context, jobID string, seq int64, ts time.Time, stream, payload string) error { + _, err := s.db.ExecContext(ctx, + `INSERT INTO job_logs (job_id, seq, ts, stream, payload) VALUES (?,?,?,?,?)`, + jobID, seq, ts.UTC().Format(time.RFC3339Nano), stream, payload) + if err != nil { + return fmt.Errorf("store: append job log: %w", err) + } + return nil +} + +// GetJob returns a job row. +func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) { + row := s.db.QueryRowContext(ctx, + `SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id, + started_at, finished_at, exit_code, stats, error, created_at + FROM jobs WHERE id = ?`, id) + var ( + j Job + schedID sql.NullString + actorID sql.NullString + startedAt sql.NullString + finishedAt sql.NullString + exitCode sql.NullInt64 + stats sql.NullString + errMsg sql.NullString + createdAt string + ) + if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID, + &j.ActorKind, &actorID, &startedAt, &finishedAt, + &exitCode, &stats, &errMsg, &createdAt); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("store: scan job: %w", err) + } + if schedID.Valid { + s := schedID.String + j.ScheduledID = &s + } + if actorID.Valid { + s := actorID.String + j.ActorID = &s + } + if startedAt.Valid { + t, _ := time.Parse(time.RFC3339Nano, startedAt.String) + j.StartedAt = &t + } + if finishedAt.Valid { + t, _ := time.Parse(time.RFC3339Nano, finishedAt.String) + j.FinishedAt = &t + } + if exitCode.Valid { + i := int(exitCode.Int64) + j.ExitCode = &i + } + if stats.Valid && stats.String != "" { + j.Stats = json.RawMessage(stats.String) + } + if errMsg.Valid { + s := errMsg.String + j.Error = &s + } + t, _ := time.Parse(time.RFC3339Nano, createdAt) + j.CreatedAt = t + return &j, nil +} + +func nullableStr(s string) any { + if s == "" { + return nil + } + return s +}