agent: command.update handler + updater package (Linux + Windows)

This commit is contained in:
2026-05-06 21:42:50 +01:00
parent f31f6edde7
commit efed96f67a
8 changed files with 421 additions and 17 deletions
+8 -4
View File
@@ -148,6 +148,7 @@ func run() error {
resticBin: resticBin,
resticVer: snap.ResticVersion,
resticSupportsNoOwnership: resticSupportsNoOwnership,
serverURL: cfg.ServerURL,
secrets: sec,
scheduler: scheduler.New(),
}
@@ -214,6 +215,7 @@ type dispatcher struct {
resticBin string
resticVer string // e.g. "0.17.1"; empty if restic isn't installed yet
resticSupportsNoOwnership bool // captured at startup from `restic restore --help`
serverURL string // base URL of the server (used by the self-update fetch)
secrets *secrets.Store
scheduler *scheduler.Scheduler
@@ -395,10 +397,12 @@ func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.S
"up_kbps", up, "down_kbps", down)
}
case api.MsgAgentUpdateAvail:
var p api.AgentUpdateAvailablePayload
_ = env.UnmarshalPayload(&p)
slog.Info("ws agent: update available", "version", p.LatestVersion, "url", p.PackageURL)
case api.MsgCommandUpdate:
var p api.CommandUpdatePayload
if err := env.UnmarshalPayload(&p); err != nil {
return fmt.Errorf("command.update: %w", err)
}
go d.runUpdate(ctx, p, tx)
default:
slog.Debug("ws agent: ignored message", "type", env.Type)
+65
View File
@@ -0,0 +1,65 @@
package main
import (
"context"
"fmt"
"log/slog"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/updater"
"gitea.dcglab.co.uk/steve/restic-manager/internal/agent/wsclient"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
)
// runUpdate handles a server-dispatched command.update. It logs progress
// via log.stream so the live job page captures pre-restart state, then
// calls the platform updater. On Linux the updater calls os.Exit; on
// Windows it spawns a detached helper and returns, with the agent then
// exiting.
//
// The terminal job state is set by the server, not the agent: success
// is "agent re-hellos with matching version" rather than anything the
// agent itself can assert. The only `job.finished` we send from here is
// on the failure path, before any restart attempt.
func (d *dispatcher) runUpdate(ctx context.Context, p api.CommandUpdatePayload, tx wsclient.Sender) {
logf := func(format string, args ...any) {
line := fmt.Sprintf(format, args...)
slog.Info("ws agent: update: " + line)
env, err := api.Marshal(api.MsgLogStream, "", api.LogStreamLine{
JobID: p.JobID,
TS: time.Now().UTC(),
Stream: api.LogStdout,
Payload: line,
})
if err == nil {
_ = tx.Send(env)
}
}
startedEnv, err := api.Marshal(api.MsgJobStarted, "", api.JobStartedPayload{
JobID: p.JobID,
Kind: api.JobUpdate,
StartedAt: time.Now().UTC(),
})
if err == nil {
_ = tx.Send(startedEnv)
}
logf("fetching new binary from %s", d.serverURL)
if err := updater.Update(ctx, d.serverURL); err != nil {
logf("update failed: %v", err)
finishedEnv, mErr := api.Marshal(api.MsgJobFinished, "", api.JobFinishedPayload{
JobID: p.JobID,
Status: api.JobFailed,
FinishedAt: time.Now().UTC(),
Error: err.Error(),
})
if mErr == nil {
_ = tx.Send(finishedEnv)
}
return
}
// Unreachable on Linux (Update calls os.Exit). On Windows control
// returns here while the detached helper does the swap-and-restart;
// the agent then exits cleanly so SCM hands off.
}
+100
View File
@@ -0,0 +1,100 @@
// Package updater carries the agent's self-update logic.
//
// The flow is operator-driven: the server dispatches a command.update
// WS envelope, the agent fetches a fresh binary from the server's
// /agent/binary endpoint, atomic-renames it over the running binary
// (Linux) or hands off to a detached helper script (Windows), and
// exits cleanly so the service manager restarts under the new
// binary. See docs/superpowers/specs/2026-05-06-p6-01-02-...
//
// Platform-specific code is build-tagged into updater_unix.go /
// updater_windows.go. This file holds the shared HTTP fetch + path
// helpers + the test seam.
package updater
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"runtime"
"time"
)
// fetch downloads the new binary into <binaryPath>.new, fsyncs, chmods.
// Returns the path of the staged file (always binaryPath + ".new").
func fetch(ctx context.Context, serverURL, binaryPath string) (string, error) {
url := fmt.Sprintf("%s/agent/binary?os=%s&arch=%s", serverURL, runtime.GOOS, runtime.GOARCH)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", err
}
c := &http.Client{Timeout: 5 * time.Minute}
res, err := c.Do(req)
if err != nil {
return "", err
}
defer func() { _ = res.Body.Close() }()
if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("agent binary fetch: %s", res.Status)
}
stagePath := binaryPath + ".new"
f, err := os.OpenFile(stagePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o755)
if err != nil {
return "", err
}
if _, copyErr := io.Copy(f, res.Body); copyErr != nil {
_ = f.Close()
_ = os.Remove(stagePath)
return "", copyErr
}
if syncErr := f.Sync(); syncErr != nil {
_ = f.Close()
_ = os.Remove(stagePath)
return "", syncErr
}
if closeErr := f.Close(); closeErr != nil {
_ = os.Remove(stagePath)
return "", closeErr
}
if err := os.Chmod(stagePath, 0o755); err != nil {
_ = os.Remove(stagePath)
return "", err
}
return stagePath, nil
}
// resolveOwnBinary returns the absolute path of the running binary.
// Refuses /proc/self/exe — that's what os.Executable returns on some
// systems but the path can't be renamed across.
func resolveOwnBinary() (string, error) {
p, err := os.Executable()
if err != nil {
return "", err
}
abs, err := filepath.Abs(p)
if err != nil {
return "", err
}
if abs == "/proc/self/exe" {
return "", fmt.Errorf("cannot resolve own binary path (/proc/self/exe)")
}
return abs, nil
}
// UpdateForTest is the platform-neutral test seam. In production the
// platform-specific Update fetches, swaps, then exits the process.
// UpdateForTest stops short of the exit so unit tests can assert on
// file state.
func UpdateForTest(serverURL, binaryPath string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
stage, err := fetch(ctx, serverURL, binaryPath)
if err != nil {
return err
}
return swap(stage, binaryPath)
}
+87
View File
@@ -0,0 +1,87 @@
//go:build !windows
package updater
import (
"bytes"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"testing"
)
// TestUpdate_LinuxAtomicSwap stages a fake "running binary" file, runs
// UpdateForTest against a fake /agent/binary server, and asserts that
// the binary was swapped, .old preserves the previous bytes, and .new
// was renamed away.
func TestUpdate_LinuxAtomicSwap(t *testing.T) {
tmp := t.TempDir()
binPath := filepath.Join(tmp, "agent")
if err := os.WriteFile(binPath, []byte("OLD"), 0o755); err != nil {
t.Fatal(err)
}
newBytes := []byte("NEW BINARY CONTENTS")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/agent/binary" {
http.NotFound(w, r)
return
}
gotOS, gotArch := r.URL.Query().Get("os"), r.URL.Query().Get("arch")
if gotOS != runtime.GOOS || gotArch != runtime.GOARCH {
t.Errorf("query mismatch: got os=%s arch=%s want %s/%s",
gotOS, gotArch, runtime.GOOS, runtime.GOARCH)
}
_, _ = io.Copy(w, bytes.NewReader(newBytes))
}))
defer srv.Close()
if err := UpdateForTest(srv.URL, binPath); err != nil {
t.Fatalf("update: %v", err)
}
got, err := os.ReadFile(binPath)
if err != nil {
t.Fatal(err)
}
if string(got) != string(newBytes) {
t.Fatalf("binary contents: got %q want %q", got, newBytes)
}
old, err := os.ReadFile(binPath + ".old")
if err != nil {
t.Fatalf("agent.old missing: %v", err)
}
if string(old) != "OLD" {
t.Fatalf("agent.old contents: got %q want %q", old, "OLD")
}
if _, err := os.Stat(binPath + ".new"); !os.IsNotExist(err) {
t.Fatalf("agent.new should be absent after swap, got err=%v", err)
}
}
// TestUpdate_FetchHTTPError surfaces the server's status when the
// binary is not published for this os/arch.
func TestUpdate_FetchHTTPError(t *testing.T) {
tmp := t.TempDir()
binPath := filepath.Join(tmp, "agent")
if err := os.WriteFile(binPath, []byte("OLD"), 0o755); err != nil {
t.Fatal(err)
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, `{"error":"binary_not_published"}`, http.StatusNotFound)
}))
defer srv.Close()
err := UpdateForTest(srv.URL, binPath)
if err == nil {
t.Fatal("expected error, got nil")
}
got, _ := os.ReadFile(binPath)
if string(got) != "OLD" {
t.Fatalf("binary should not have changed, got %q", got)
}
}
+73
View File
@@ -0,0 +1,73 @@
//go:build !windows
package updater
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"time"
)
// Update fetches the new binary, swaps it in, then exits so systemd
// restarts the process under the new binary. The caller should close
// the WS connection cleanly (so the server transitions the host to
// disconnected immediately rather than waiting for the heartbeat
// sweep) before invoking.
//
// Service-user assumption: the agent runs as root under the
// systemd-shipped unit, which can write the binary path directly.
// If the agent ever moves to a non-root service user, this breaks —
// would need a setuid helper or an out-of-process update service.
func Update(ctx context.Context, serverURL string) error {
binPath, err := resolveOwnBinary()
if err != nil {
return err
}
stage, err := fetch(ctx, serverURL, binPath)
if err != nil {
return err
}
if err := swap(stage, binPath); err != nil {
return err
}
slog.Info("agent self-update: binary swapped, exiting for systemd restart",
"binary", binPath)
// Give logger / WS close-frame a moment to flush, then exit.
time.Sleep(200 * time.Millisecond)
os.Exit(0)
return nil // unreachable
}
// swap copies the running binary to <bin>.old (M1 — keep one revision
// back for hand-rolled rollback), then atomic-renames the staged
// binary into place. Linux supports rename-while-open so this works
// even though the running process holds the source open.
func swap(stagePath, binPath string) error {
src, err := os.Open(binPath)
if err != nil {
return fmt.Errorf("open running binary: %w", err)
}
defer func() { _ = src.Close() }()
dst, err := os.OpenFile(binPath+".old", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o755)
if err != nil {
return fmt.Errorf("open .old: %w", err)
}
if _, err := io.Copy(dst, src); err != nil {
_ = dst.Close()
return fmt.Errorf("copy to .old: %w", err)
}
if err := dst.Sync(); err != nil {
_ = dst.Close()
return err
}
if err := dst.Close(); err != nil {
return err
}
if err := os.Rename(stagePath, binPath); err != nil {
return fmt.Errorf("rename .new over running binary: %w", err)
}
return nil
}
+73
View File
@@ -0,0 +1,73 @@
//go:build windows
package updater
import (
"context"
"fmt"
"log/slog"
"os"
"os/exec"
"path/filepath"
"syscall"
"time"
)
// helperScript is rendered with fmt.Sprintf, args order:
//
// %[1]s — running binary path (source for the .old copy)
// %[2]s — .old path
// %[3]s — staged .new path
// %[4]s — running binary path (rename target)
const helperScript = `@echo off
timeout /t 3 /nobreak >nul
copy /Y "%[1]s" "%[2]s"
sc stop restic-manager-agent
:wait
sc query restic-manager-agent | find "STOPPED" >nul
if errorlevel 1 (timeout /t 1 /nobreak >nul & goto wait)
move /Y "%[3]s" "%[4]s"
sc start restic-manager-agent
del "%%~f0"
`
// Update on Windows can't overwrite the running .exe in-process
// (exclusive file lock), so we stage the new binary, write a small
// detached helper script that waits, stops the service, swaps the
// binary, and starts the service, then exit cleanly. SCM treats
// clean exits after sc stop as intentional and does not auto-restart;
// the helper's final sc start handles that.
func Update(ctx context.Context, serverURL string) error {
binPath, err := resolveOwnBinary()
if err != nil {
return err
}
stage, err := fetch(ctx, serverURL, binPath)
if err != nil {
return err
}
helperPath := filepath.Join(filepath.Dir(binPath), "agent-update.cmd")
body := fmt.Sprintf(helperScript, binPath, binPath+".old", stage, binPath)
if err := os.WriteFile(helperPath, []byte(body), 0o755); err != nil {
return err
}
cmd := exec.Command("cmd.exe", "/c", helperPath)
cmd.SysProcAttr = &syscall.SysProcAttr{
HideWindow: true,
CreationFlags: 0x00000008 | 0x08000000, // DETACHED_PROCESS | CREATE_NO_WINDOW
}
if err := cmd.Start(); err != nil {
return err
}
slog.Info("agent self-update: helper spawned, exiting cleanly",
"binary", binPath, "helper", helperPath)
time.Sleep(200 * time.Millisecond)
os.Exit(0)
return nil // unreachable
}
// swap is unused on Windows — the helper script does the swap.
// Defined to satisfy the build (UpdateForTest references it).
func swap(_, _ string) error {
return fmt.Errorf("updater.swap not implemented on Windows; use the helper script via Update")
}
+9 -7
View File
@@ -63,6 +63,7 @@ const (
JobUnlock JobKind = "unlock"
JobRestore JobKind = "restore"
JobDiff JobKind = "diff"
JobUpdate JobKind = "update"
)
// JobStatus is the lifecycle state of a job.
@@ -361,13 +362,14 @@ type ConfigUpdatePayload struct {
BandwidthDownKBps *int `json:"bandwidth_down_kbps,omitempty"`
}
// AgentUpdateAvailablePayload — informational only; the agent does
// NOT self-update. See spec.md §4.2 for the package-manager-based
// update model.
type AgentUpdateAvailablePayload struct {
LatestVersion string `json:"latest_version"`
PackageURL string `json:"package_url"` // apt repo / choco source
Changelog string `json:"changelog,omitempty"`
// CommandUpdatePayload carries no operational data — the agent
// already knows its own os/arch and fetches from its configured
// server URL via /agent/binary. JobID is the server-issued id of
// the update job; the agent echoes it on log.stream lines so the
// live job log captures pre-restart progress, then either exits
// (Linux) or hands off to a detached helper script (Windows).
type CommandUpdatePayload struct {
JobID string `json:"job_id"`
}
// TreeListRequestPayload is the body of a tree.list RPC. Used by the
+6 -6
View File
@@ -29,12 +29,12 @@ const (
// Server → agent message types.
const (
MsgCommandRun MessageType = "command.run"
MsgCommandCancel MessageType = "command.cancel"
MsgScheduleSet MessageType = "schedule.set"
MsgConfigUpdate MessageType = "config.update"
MsgAgentUpdateAvail MessageType = "agent.update.available"
MsgTreeList MessageType = "tree.list" // sync RPC: list a snapshot's children
MsgCommandRun MessageType = "command.run"
MsgCommandCancel MessageType = "command.cancel"
MsgScheduleSet MessageType = "schedule.set"
MsgConfigUpdate MessageType = "config.update"
MsgCommandUpdate MessageType = "command.update"
MsgTreeList MessageType = "tree.list" // sync RPC: list a snapshot's children
)
// Envelope is the framing for every WS message in either direction.