agent: command.update handler + updater package (Linux + Windows)
This commit is contained in:
@@ -0,0 +1,100 @@
|
||||
// Package updater carries the agent's self-update logic.
|
||||
//
|
||||
// The flow is operator-driven: the server dispatches a command.update
|
||||
// WS envelope, the agent fetches a fresh binary from the server's
|
||||
// /agent/binary endpoint, atomic-renames it over the running binary
|
||||
// (Linux) or hands off to a detached helper script (Windows), and
|
||||
// exits cleanly so the service manager restarts under the new
|
||||
// binary. See docs/superpowers/specs/2026-05-06-p6-01-02-...
|
||||
//
|
||||
// Platform-specific code is build-tagged into updater_unix.go /
|
||||
// updater_windows.go. This file holds the shared HTTP fetch + path
|
||||
// helpers + the test seam.
|
||||
package updater
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
// fetch downloads the new binary into <binaryPath>.new, fsyncs, chmods.
|
||||
// Returns the path of the staged file (always binaryPath + ".new").
|
||||
func fetch(ctx context.Context, serverURL, binaryPath string) (string, error) {
|
||||
url := fmt.Sprintf("%s/agent/binary?os=%s&arch=%s", serverURL, runtime.GOOS, runtime.GOARCH)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
c := &http.Client{Timeout: 5 * time.Minute}
|
||||
res, err := c.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer func() { _ = res.Body.Close() }()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return "", fmt.Errorf("agent binary fetch: %s", res.Status)
|
||||
}
|
||||
|
||||
stagePath := binaryPath + ".new"
|
||||
f, err := os.OpenFile(stagePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o755)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if _, copyErr := io.Copy(f, res.Body); copyErr != nil {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(stagePath)
|
||||
return "", copyErr
|
||||
}
|
||||
if syncErr := f.Sync(); syncErr != nil {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(stagePath)
|
||||
return "", syncErr
|
||||
}
|
||||
if closeErr := f.Close(); closeErr != nil {
|
||||
_ = os.Remove(stagePath)
|
||||
return "", closeErr
|
||||
}
|
||||
if err := os.Chmod(stagePath, 0o755); err != nil {
|
||||
_ = os.Remove(stagePath)
|
||||
return "", err
|
||||
}
|
||||
return stagePath, nil
|
||||
}
|
||||
|
||||
// resolveOwnBinary returns the absolute path of the running binary.
|
||||
// Refuses /proc/self/exe — that's what os.Executable returns on some
|
||||
// systems but the path can't be renamed across.
|
||||
func resolveOwnBinary() (string, error) {
|
||||
p, err := os.Executable()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
abs, err := filepath.Abs(p)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if abs == "/proc/self/exe" {
|
||||
return "", fmt.Errorf("cannot resolve own binary path (/proc/self/exe)")
|
||||
}
|
||||
return abs, nil
|
||||
}
|
||||
|
||||
// UpdateForTest is the platform-neutral test seam. In production the
|
||||
// platform-specific Update fetches, swaps, then exits the process.
|
||||
// UpdateForTest stops short of the exit so unit tests can assert on
|
||||
// file state.
|
||||
func UpdateForTest(serverURL, binaryPath string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
stage, err := fetch(ctx, serverURL, binaryPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return swap(stage, binaryPath)
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
//go:build !windows
|
||||
|
||||
package updater
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestUpdate_LinuxAtomicSwap stages a fake "running binary" file, runs
|
||||
// UpdateForTest against a fake /agent/binary server, and asserts that
|
||||
// the binary was swapped, .old preserves the previous bytes, and .new
|
||||
// was renamed away.
|
||||
func TestUpdate_LinuxAtomicSwap(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
binPath := filepath.Join(tmp, "agent")
|
||||
if err := os.WriteFile(binPath, []byte("OLD"), 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
newBytes := []byte("NEW BINARY CONTENTS")
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/agent/binary" {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
gotOS, gotArch := r.URL.Query().Get("os"), r.URL.Query().Get("arch")
|
||||
if gotOS != runtime.GOOS || gotArch != runtime.GOARCH {
|
||||
t.Errorf("query mismatch: got os=%s arch=%s want %s/%s",
|
||||
gotOS, gotArch, runtime.GOOS, runtime.GOARCH)
|
||||
}
|
||||
_, _ = io.Copy(w, bytes.NewReader(newBytes))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
if err := UpdateForTest(srv.URL, binPath); err != nil {
|
||||
t.Fatalf("update: %v", err)
|
||||
}
|
||||
|
||||
got, err := os.ReadFile(binPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(got) != string(newBytes) {
|
||||
t.Fatalf("binary contents: got %q want %q", got, newBytes)
|
||||
}
|
||||
old, err := os.ReadFile(binPath + ".old")
|
||||
if err != nil {
|
||||
t.Fatalf("agent.old missing: %v", err)
|
||||
}
|
||||
if string(old) != "OLD" {
|
||||
t.Fatalf("agent.old contents: got %q want %q", old, "OLD")
|
||||
}
|
||||
if _, err := os.Stat(binPath + ".new"); !os.IsNotExist(err) {
|
||||
t.Fatalf("agent.new should be absent after swap, got err=%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdate_FetchHTTPError surfaces the server's status when the
|
||||
// binary is not published for this os/arch.
|
||||
func TestUpdate_FetchHTTPError(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
binPath := filepath.Join(tmp, "agent")
|
||||
if err := os.WriteFile(binPath, []byte("OLD"), 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, `{"error":"binary_not_published"}`, http.StatusNotFound)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
err := UpdateForTest(srv.URL, binPath)
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
got, _ := os.ReadFile(binPath)
|
||||
if string(got) != "OLD" {
|
||||
t.Fatalf("binary should not have changed, got %q", got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
//go:build !windows
|
||||
|
||||
package updater
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Update fetches the new binary, swaps it in, then exits so systemd
|
||||
// restarts the process under the new binary. The caller should close
|
||||
// the WS connection cleanly (so the server transitions the host to
|
||||
// disconnected immediately rather than waiting for the heartbeat
|
||||
// sweep) before invoking.
|
||||
//
|
||||
// Service-user assumption: the agent runs as root under the
|
||||
// systemd-shipped unit, which can write the binary path directly.
|
||||
// If the agent ever moves to a non-root service user, this breaks —
|
||||
// would need a setuid helper or an out-of-process update service.
|
||||
func Update(ctx context.Context, serverURL string) error {
|
||||
binPath, err := resolveOwnBinary()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stage, err := fetch(ctx, serverURL, binPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := swap(stage, binPath); err != nil {
|
||||
return err
|
||||
}
|
||||
slog.Info("agent self-update: binary swapped, exiting for systemd restart",
|
||||
"binary", binPath)
|
||||
// Give logger / WS close-frame a moment to flush, then exit.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
os.Exit(0)
|
||||
return nil // unreachable
|
||||
}
|
||||
|
||||
// swap copies the running binary to <bin>.old (M1 — keep one revision
|
||||
// back for hand-rolled rollback), then atomic-renames the staged
|
||||
// binary into place. Linux supports rename-while-open so this works
|
||||
// even though the running process holds the source open.
|
||||
func swap(stagePath, binPath string) error {
|
||||
src, err := os.Open(binPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open running binary: %w", err)
|
||||
}
|
||||
defer func() { _ = src.Close() }()
|
||||
dst, err := os.OpenFile(binPath+".old", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open .old: %w", err)
|
||||
}
|
||||
if _, err := io.Copy(dst, src); err != nil {
|
||||
_ = dst.Close()
|
||||
return fmt.Errorf("copy to .old: %w", err)
|
||||
}
|
||||
if err := dst.Sync(); err != nil {
|
||||
_ = dst.Close()
|
||||
return err
|
||||
}
|
||||
if err := dst.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.Rename(stagePath, binPath); err != nil {
|
||||
return fmt.Errorf("rename .new over running binary: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
//go:build windows
|
||||
|
||||
package updater
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
// helperScript is rendered with fmt.Sprintf, args order:
|
||||
//
|
||||
// %[1]s — running binary path (source for the .old copy)
|
||||
// %[2]s — .old path
|
||||
// %[3]s — staged .new path
|
||||
// %[4]s — running binary path (rename target)
|
||||
const helperScript = `@echo off
|
||||
timeout /t 3 /nobreak >nul
|
||||
copy /Y "%[1]s" "%[2]s"
|
||||
sc stop restic-manager-agent
|
||||
:wait
|
||||
sc query restic-manager-agent | find "STOPPED" >nul
|
||||
if errorlevel 1 (timeout /t 1 /nobreak >nul & goto wait)
|
||||
move /Y "%[3]s" "%[4]s"
|
||||
sc start restic-manager-agent
|
||||
del "%%~f0"
|
||||
`
|
||||
|
||||
// Update on Windows can't overwrite the running .exe in-process
|
||||
// (exclusive file lock), so we stage the new binary, write a small
|
||||
// detached helper script that waits, stops the service, swaps the
|
||||
// binary, and starts the service, then exit cleanly. SCM treats
|
||||
// clean exits after sc stop as intentional and does not auto-restart;
|
||||
// the helper's final sc start handles that.
|
||||
func Update(ctx context.Context, serverURL string) error {
|
||||
binPath, err := resolveOwnBinary()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stage, err := fetch(ctx, serverURL, binPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
helperPath := filepath.Join(filepath.Dir(binPath), "agent-update.cmd")
|
||||
body := fmt.Sprintf(helperScript, binPath, binPath+".old", stage, binPath)
|
||||
if err := os.WriteFile(helperPath, []byte(body), 0o755); err != nil {
|
||||
return err
|
||||
}
|
||||
cmd := exec.Command("cmd.exe", "/c", helperPath)
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{
|
||||
HideWindow: true,
|
||||
CreationFlags: 0x00000008 | 0x08000000, // DETACHED_PROCESS | CREATE_NO_WINDOW
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
slog.Info("agent self-update: helper spawned, exiting cleanly",
|
||||
"binary", binPath, "helper", helperPath)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
os.Exit(0)
|
||||
return nil // unreachable
|
||||
}
|
||||
|
||||
// swap is unused on Windows — the helper script does the swap.
|
||||
// Defined to satisfy the build (UpdateForTest references it).
|
||||
func swap(_, _ string) error {
|
||||
return fmt.Errorf("updater.swap not implemented on Windows; use the helper script via Update")
|
||||
}
|
||||
@@ -63,6 +63,7 @@ const (
|
||||
JobUnlock JobKind = "unlock"
|
||||
JobRestore JobKind = "restore"
|
||||
JobDiff JobKind = "diff"
|
||||
JobUpdate JobKind = "update"
|
||||
)
|
||||
|
||||
// JobStatus is the lifecycle state of a job.
|
||||
@@ -361,13 +362,14 @@ type ConfigUpdatePayload struct {
|
||||
BandwidthDownKBps *int `json:"bandwidth_down_kbps,omitempty"`
|
||||
}
|
||||
|
||||
// AgentUpdateAvailablePayload — informational only; the agent does
|
||||
// NOT self-update. See spec.md §4.2 for the package-manager-based
|
||||
// update model.
|
||||
type AgentUpdateAvailablePayload struct {
|
||||
LatestVersion string `json:"latest_version"`
|
||||
PackageURL string `json:"package_url"` // apt repo / choco source
|
||||
Changelog string `json:"changelog,omitempty"`
|
||||
// CommandUpdatePayload carries no operational data — the agent
|
||||
// already knows its own os/arch and fetches from its configured
|
||||
// server URL via /agent/binary. JobID is the server-issued id of
|
||||
// the update job; the agent echoes it on log.stream lines so the
|
||||
// live job log captures pre-restart progress, then either exits
|
||||
// (Linux) or hands off to a detached helper script (Windows).
|
||||
type CommandUpdatePayload struct {
|
||||
JobID string `json:"job_id"`
|
||||
}
|
||||
|
||||
// TreeListRequestPayload is the body of a tree.list RPC. Used by the
|
||||
|
||||
@@ -29,12 +29,12 @@ const (
|
||||
|
||||
// Server → agent message types.
|
||||
const (
|
||||
MsgCommandRun MessageType = "command.run"
|
||||
MsgCommandCancel MessageType = "command.cancel"
|
||||
MsgScheduleSet MessageType = "schedule.set"
|
||||
MsgConfigUpdate MessageType = "config.update"
|
||||
MsgAgentUpdateAvail MessageType = "agent.update.available"
|
||||
MsgTreeList MessageType = "tree.list" // sync RPC: list a snapshot's children
|
||||
MsgCommandRun MessageType = "command.run"
|
||||
MsgCommandCancel MessageType = "command.cancel"
|
||||
MsgScheduleSet MessageType = "schedule.set"
|
||||
MsgConfigUpdate MessageType = "config.update"
|
||||
MsgCommandUpdate MessageType = "command.update"
|
||||
MsgTreeList MessageType = "tree.list" // sync RPC: list a snapshot's children
|
||||
)
|
||||
|
||||
// Envelope is the framing for every WS message in either direction.
|
||||
|
||||
Reference in New Issue
Block a user