From 74cf24c28bce9f8b43508ddac1dcf8e8ee7df50b Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Wed, 6 May 2026 21:42:50 +0100 Subject: [PATCH] agent: command.update handler + updater package (Linux + Windows) --- cmd/agent/main.go | 12 ++- cmd/agent/update_dispatch.go | 65 ++++++++++++++ internal/agent/updater/updater.go | 100 ++++++++++++++++++++++ internal/agent/updater/updater_test.go | 87 +++++++++++++++++++ internal/agent/updater/updater_unix.go | 73 ++++++++++++++++ internal/agent/updater/updater_windows.go | 73 ++++++++++++++++ internal/api/messages.go | 16 ++-- internal/api/wire.go | 12 +-- 8 files changed, 421 insertions(+), 17 deletions(-) create mode 100644 cmd/agent/update_dispatch.go create mode 100644 internal/agent/updater/updater.go create mode 100644 internal/agent/updater/updater_test.go create mode 100644 internal/agent/updater/updater_unix.go create mode 100644 internal/agent/updater/updater_windows.go diff --git a/cmd/agent/main.go b/cmd/agent/main.go index d1b5041..e04dd2d 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -148,6 +148,7 @@ func run() error { resticBin: resticBin, resticVer: snap.ResticVersion, resticSupportsNoOwnership: resticSupportsNoOwnership, + serverURL: cfg.ServerURL, secrets: sec, scheduler: scheduler.New(), } @@ -214,6 +215,7 @@ type dispatcher struct { resticBin string resticVer string // e.g. "0.17.1"; empty if restic isn't installed yet resticSupportsNoOwnership bool // captured at startup from `restic restore --help` + serverURL string // base URL of the server (used by the self-update fetch) secrets *secrets.Store scheduler *scheduler.Scheduler @@ -395,10 +397,12 @@ func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.S "up_kbps", up, "down_kbps", down) } - case api.MsgAgentUpdateAvail: - var p api.AgentUpdateAvailablePayload - _ = env.UnmarshalPayload(&p) - slog.Info("ws agent: update available", "version", p.LatestVersion, "url", p.PackageURL) + case api.MsgCommandUpdate: + var p api.CommandUpdatePayload + if err := env.UnmarshalPayload(&p); err != nil { + return fmt.Errorf("command.update: %w", err) + } + go d.runUpdate(ctx, p, tx) default: slog.Debug("ws agent: ignored message", "type", env.Type) diff --git a/cmd/agent/update_dispatch.go b/cmd/agent/update_dispatch.go new file mode 100644 index 0000000..50bdddf --- /dev/null +++ b/cmd/agent/update_dispatch.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "time" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/updater" + "gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient" + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" +) + +// runUpdate handles a server-dispatched command.update. It logs progress +// via log.stream so the live job page captures pre-restart state, then +// calls the platform updater. On Linux the updater calls os.Exit; on +// Windows it spawns a detached helper and returns, with the agent then +// exiting. +// +// The terminal job state is set by the server, not the agent: success +// is "agent re-hellos with matching version" rather than anything the +// agent itself can assert. The only `job.finished` we send from here is +// on the failure path, before any restart attempt. +func (d *dispatcher) runUpdate(ctx context.Context, p api.CommandUpdatePayload, tx wsclient.Sender) { + logf := func(format string, args ...any) { + line := fmt.Sprintf(format, args...) + slog.Info("ws agent: update: " + line) + env, err := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{ + JobID: p.JobID, + TS: time.Now().UTC(), + Stream: api.LogStdout, + Payload: line, + }) + if err == nil { + _ = tx.Send(env) + } + } + + startedEnv, err := api.Marshal(api.MsgJobStarted, "", api.JobStartedPayload{ + JobID: p.JobID, + Kind: api.JobUpdate, + StartedAt: time.Now().UTC(), + }) + if err == nil { + _ = tx.Send(startedEnv) + } + + logf("fetching new binary from %s", d.serverURL) + if err := updater.Update(ctx, d.serverURL); err != nil { + logf("update failed: %v", err) + finishedEnv, mErr := api.Marshal(api.MsgJobFinished, "", api.JobFinishedPayload{ + JobID: p.JobID, + Status: api.JobFailed, + FinishedAt: time.Now().UTC(), + Error: err.Error(), + }) + if mErr == nil { + _ = tx.Send(finishedEnv) + } + return + } + // Unreachable on Linux (Update calls os.Exit). On Windows control + // returns here while the detached helper does the swap-and-restart; + // the agent then exits cleanly so SCM hands off. +} diff --git a/internal/agent/updater/updater.go b/internal/agent/updater/updater.go new file mode 100644 index 0000000..90b4f96 --- /dev/null +++ b/internal/agent/updater/updater.go @@ -0,0 +1,100 @@ +// Package updater carries the agent's self-update logic. +// +// The flow is operator-driven: the server dispatches a command.update +// WS envelope, the agent fetches a fresh binary from the server's +// /agent/binary endpoint, atomic-renames it over the running binary +// (Linux) or hands off to a detached helper script (Windows), and +// exits cleanly so the service manager restarts under the new +// binary. See docs/superpowers/specs/2026-05-06-p6-01-02-... +// +// Platform-specific code is build-tagged into updater_unix.go / +// updater_windows.go. This file holds the shared HTTP fetch + path +// helpers + the test seam. +package updater + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "runtime" + "time" +) + +// fetch downloads the new binary into .new, fsyncs, chmods. +// Returns the path of the staged file (always binaryPath + ".new"). +func fetch(ctx context.Context, serverURL, binaryPath string) (string, error) { + url := fmt.Sprintf("%s/agent/binary?os=%s&arch=%s", serverURL, runtime.GOOS, runtime.GOARCH) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return "", err + } + c := &http.Client{Timeout: 5 * time.Minute} + res, err := c.Do(req) + if err != nil { + return "", err + } + defer func() { _ = res.Body.Close() }() + if res.StatusCode != http.StatusOK { + return "", fmt.Errorf("agent binary fetch: %s", res.Status) + } + + stagePath := binaryPath + ".new" + f, err := os.OpenFile(stagePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o755) + if err != nil { + return "", err + } + if _, copyErr := io.Copy(f, res.Body); copyErr != nil { + _ = f.Close() + _ = os.Remove(stagePath) + return "", copyErr + } + if syncErr := f.Sync(); syncErr != nil { + _ = f.Close() + _ = os.Remove(stagePath) + return "", syncErr + } + if closeErr := f.Close(); closeErr != nil { + _ = os.Remove(stagePath) + return "", closeErr + } + if err := os.Chmod(stagePath, 0o755); err != nil { + _ = os.Remove(stagePath) + return "", err + } + return stagePath, nil +} + +// resolveOwnBinary returns the absolute path of the running binary. +// Refuses /proc/self/exe — that's what os.Executable returns on some +// systems but the path can't be renamed across. +func resolveOwnBinary() (string, error) { + p, err := os.Executable() + if err != nil { + return "", err + } + abs, err := filepath.Abs(p) + if err != nil { + return "", err + } + if abs == "/proc/self/exe" { + return "", fmt.Errorf("cannot resolve own binary path (/proc/self/exe)") + } + return abs, nil +} + +// UpdateForTest is the platform-neutral test seam. In production the +// platform-specific Update fetches, swaps, then exits the process. +// UpdateForTest stops short of the exit so unit tests can assert on +// file state. +func UpdateForTest(serverURL, binaryPath string) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + stage, err := fetch(ctx, serverURL, binaryPath) + if err != nil { + return err + } + return swap(stage, binaryPath) +} diff --git a/internal/agent/updater/updater_test.go b/internal/agent/updater/updater_test.go new file mode 100644 index 0000000..435ef21 --- /dev/null +++ b/internal/agent/updater/updater_test.go @@ -0,0 +1,87 @@ +//go:build !windows + +package updater + +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "runtime" + "testing" +) + +// TestUpdate_LinuxAtomicSwap stages a fake "running binary" file, runs +// UpdateForTest against a fake /agent/binary server, and asserts that +// the binary was swapped, .old preserves the previous bytes, and .new +// was renamed away. +func TestUpdate_LinuxAtomicSwap(t *testing.T) { + tmp := t.TempDir() + binPath := filepath.Join(tmp, "agent") + if err := os.WriteFile(binPath, []byte("OLD"), 0o755); err != nil { + t.Fatal(err) + } + newBytes := []byte("NEW BINARY CONTENTS") + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/agent/binary" { + http.NotFound(w, r) + return + } + gotOS, gotArch := r.URL.Query().Get("os"), r.URL.Query().Get("arch") + if gotOS != runtime.GOOS || gotArch != runtime.GOARCH { + t.Errorf("query mismatch: got os=%s arch=%s want %s/%s", + gotOS, gotArch, runtime.GOOS, runtime.GOARCH) + } + _, _ = io.Copy(w, bytes.NewReader(newBytes)) + })) + defer srv.Close() + + if err := UpdateForTest(srv.URL, binPath); err != nil { + t.Fatalf("update: %v", err) + } + + got, err := os.ReadFile(binPath) + if err != nil { + t.Fatal(err) + } + if string(got) != string(newBytes) { + t.Fatalf("binary contents: got %q want %q", got, newBytes) + } + old, err := os.ReadFile(binPath + ".old") + if err != nil { + t.Fatalf("agent.old missing: %v", err) + } + if string(old) != "OLD" { + t.Fatalf("agent.old contents: got %q want %q", old, "OLD") + } + if _, err := os.Stat(binPath + ".new"); !os.IsNotExist(err) { + t.Fatalf("agent.new should be absent after swap, got err=%v", err) + } +} + +// TestUpdate_FetchHTTPError surfaces the server's status when the +// binary is not published for this os/arch. +func TestUpdate_FetchHTTPError(t *testing.T) { + tmp := t.TempDir() + binPath := filepath.Join(tmp, "agent") + if err := os.WriteFile(binPath, []byte("OLD"), 0o755); err != nil { + t.Fatal(err) + } + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, `{"error":"binary_not_published"}`, http.StatusNotFound) + })) + defer srv.Close() + + err := UpdateForTest(srv.URL, binPath) + if err == nil { + t.Fatal("expected error, got nil") + } + got, _ := os.ReadFile(binPath) + if string(got) != "OLD" { + t.Fatalf("binary should not have changed, got %q", got) + } +} diff --git a/internal/agent/updater/updater_unix.go b/internal/agent/updater/updater_unix.go new file mode 100644 index 0000000..81ebf50 --- /dev/null +++ b/internal/agent/updater/updater_unix.go @@ -0,0 +1,73 @@ +//go:build !windows + +package updater + +import ( + "context" + "fmt" + "io" + "log/slog" + "os" + "time" +) + +// Update fetches the new binary, swaps it in, then exits so systemd +// restarts the process under the new binary. The caller should close +// the WS connection cleanly (so the server transitions the host to +// disconnected immediately rather than waiting for the heartbeat +// sweep) before invoking. +// +// Service-user assumption: the agent runs as root under the +// systemd-shipped unit, which can write the binary path directly. +// If the agent ever moves to a non-root service user, this breaks — +// would need a setuid helper or an out-of-process update service. +func Update(ctx context.Context, serverURL string) error { + binPath, err := resolveOwnBinary() + if err != nil { + return err + } + stage, err := fetch(ctx, serverURL, binPath) + if err != nil { + return err + } + if err := swap(stage, binPath); err != nil { + return err + } + slog.Info("agent self-update: binary swapped, exiting for systemd restart", + "binary", binPath) + // Give logger / WS close-frame a moment to flush, then exit. + time.Sleep(200 * time.Millisecond) + os.Exit(0) + return nil // unreachable +} + +// swap copies the running binary to .old (M1 — keep one revision +// back for hand-rolled rollback), then atomic-renames the staged +// binary into place. Linux supports rename-while-open so this works +// even though the running process holds the source open. +func swap(stagePath, binPath string) error { + src, err := os.Open(binPath) + if err != nil { + return fmt.Errorf("open running binary: %w", err) + } + defer func() { _ = src.Close() }() + dst, err := os.OpenFile(binPath+".old", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o755) + if err != nil { + return fmt.Errorf("open .old: %w", err) + } + if _, err := io.Copy(dst, src); err != nil { + _ = dst.Close() + return fmt.Errorf("copy to .old: %w", err) + } + if err := dst.Sync(); err != nil { + _ = dst.Close() + return err + } + if err := dst.Close(); err != nil { + return err + } + if err := os.Rename(stagePath, binPath); err != nil { + return fmt.Errorf("rename .new over running binary: %w", err) + } + return nil +} diff --git a/internal/agent/updater/updater_windows.go b/internal/agent/updater/updater_windows.go new file mode 100644 index 0000000..0806472 --- /dev/null +++ b/internal/agent/updater/updater_windows.go @@ -0,0 +1,73 @@ +//go:build windows + +package updater + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "syscall" + "time" +) + +// helperScript is rendered with fmt.Sprintf, args order: +// +// %[1]s — running binary path (source for the .old copy) +// %[2]s — .old path +// %[3]s — staged .new path +// %[4]s — running binary path (rename target) +const helperScript = `@echo off +timeout /t 3 /nobreak >nul +copy /Y "%[1]s" "%[2]s" +sc stop restic-manager-agent +:wait +sc query restic-manager-agent | find "STOPPED" >nul +if errorlevel 1 (timeout /t 1 /nobreak >nul & goto wait) +move /Y "%[3]s" "%[4]s" +sc start restic-manager-agent +del "%%~f0" +` + +// Update on Windows can't overwrite the running .exe in-process +// (exclusive file lock), so we stage the new binary, write a small +// detached helper script that waits, stops the service, swaps the +// binary, and starts the service, then exit cleanly. SCM treats +// clean exits after sc stop as intentional and does not auto-restart; +// the helper's final sc start handles that. +func Update(ctx context.Context, serverURL string) error { + binPath, err := resolveOwnBinary() + if err != nil { + return err + } + stage, err := fetch(ctx, serverURL, binPath) + if err != nil { + return err + } + helperPath := filepath.Join(filepath.Dir(binPath), "agent-update.cmd") + body := fmt.Sprintf(helperScript, binPath, binPath+".old", stage, binPath) + if err := os.WriteFile(helperPath, []byte(body), 0o755); err != nil { + return err + } + cmd := exec.Command("cmd.exe", "/c", helperPath) + cmd.SysProcAttr = &syscall.SysProcAttr{ + HideWindow: true, + CreationFlags: 0x00000008 | 0x08000000, // DETACHED_PROCESS | CREATE_NO_WINDOW + } + if err := cmd.Start(); err != nil { + return err + } + slog.Info("agent self-update: helper spawned, exiting cleanly", + "binary", binPath, "helper", helperPath) + time.Sleep(200 * time.Millisecond) + os.Exit(0) + return nil // unreachable +} + +// swap is unused on Windows — the helper script does the swap. +// Defined to satisfy the build (UpdateForTest references it). +func swap(_, _ string) error { + return fmt.Errorf("updater.swap not implemented on Windows; use the helper script via Update") +} diff --git a/internal/api/messages.go b/internal/api/messages.go index 8ea18f2..b9d8a2e 100644 --- a/internal/api/messages.go +++ b/internal/api/messages.go @@ -63,6 +63,7 @@ const ( JobUnlock JobKind = "unlock" JobRestore JobKind = "restore" JobDiff JobKind = "diff" + JobUpdate JobKind = "update" ) // JobStatus is the lifecycle state of a job. @@ -361,13 +362,14 @@ type ConfigUpdatePayload struct { BandwidthDownKBps *int `json:"bandwidth_down_kbps,omitempty"` } -// AgentUpdateAvailablePayload — informational only; the agent does -// NOT self-update. See spec.md §4.2 for the package-manager-based -// update model. -type AgentUpdateAvailablePayload struct { - LatestVersion string `json:"latest_version"` - PackageURL string `json:"package_url"` // apt repo / choco source - Changelog string `json:"changelog,omitempty"` +// CommandUpdatePayload carries no operational data — the agent +// already knows its own os/arch and fetches from its configured +// server URL via /agent/binary. JobID is the server-issued id of +// the update job; the agent echoes it on log.stream lines so the +// live job log captures pre-restart progress, then either exits +// (Linux) or hands off to a detached helper script (Windows). +type CommandUpdatePayload struct { + JobID string `json:"job_id"` } // TreeListRequestPayload is the body of a tree.list RPC. Used by the diff --git a/internal/api/wire.go b/internal/api/wire.go index 005827f..4573738 100644 --- a/internal/api/wire.go +++ b/internal/api/wire.go @@ -29,12 +29,12 @@ const ( // Server → agent message types. const ( - MsgCommandRun MessageType = "command.run" - MsgCommandCancel MessageType = "command.cancel" - MsgScheduleSet MessageType = "schedule.set" - MsgConfigUpdate MessageType = "config.update" - MsgAgentUpdateAvail MessageType = "agent.update.available" - MsgTreeList MessageType = "tree.list" // sync RPC: list a snapshot's children + MsgCommandRun MessageType = "command.run" + MsgCommandCancel MessageType = "command.cancel" + MsgScheduleSet MessageType = "schedule.set" + MsgConfigUpdate MessageType = "config.update" + MsgCommandUpdate MessageType = "command.update" + MsgTreeList MessageType = "tree.list" // sync RPC: list a snapshot's children ) // Envelope is the framing for every WS message in either direction.