9d5775fb47
- alert: update_failed (per-host, dedup=hostID) + fleet_update_halted
(system-scoped, host_id NULL via new RaiseOrTouchSystem helper).
- ws: UpdateWatcher tracks in-flight command.update dispatches and
reconciles them against incoming hello envelopes — success path
marks the job succeeded and auto-resolves the alert; 90s timeout
marks the job failed and raises update_failed.
- http: POST /api/hosts/{id}/update (admin-only JSON) + the HTMX
/hosts/{id}/update form variant. Pre-checks: host exists, online,
agent_version != current, no running update job. Refactored core
into Server.dispatchHostUpdate so the fleet worker can share it
without going through HTTP.
- fleetupdate: rolling worker iterating through host slots, halting
on first failure and raising fleet_update_halted. Polling-based
version-match (re-read hosts.agent_version every 1s up to 95s) —
no extra plumbing into the WS hello path. At-most-one-running is
enforced at the store layer (ErrFleetUpdateRunning).
- cmd/server: wire UpdateWatcher and FleetWorker into the main
goroutine; the worker uses a small serverDispatcher adapter that
delegates back into Server.DispatchHostUpdate.
Tests: watcher (success/timeout/mismatch/late-hello), HTTP endpoint
(happy + four pre-check branches + RBAC), worker (two-host happy,
timeout-halt, host-offline-halt, already-at-target skip, cancel
mid-run, double-Start guard).
218 lines
7.0 KiB
Go
218 lines
7.0 KiB
Go
package http
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
stdhttp "net/http"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/oklog/ulid/v2"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/version"
|
|
)
|
|
|
|
// UpdateWatcher is the slim view of the ws.updateWatcher this package
|
|
// uses for tracking in-flight update dispatches. Defined as an
|
|
// interface so a test can inject a stub.
|
|
type UpdateWatcher interface {
|
|
Track(jobID, hostID string)
|
|
}
|
|
|
|
// FleetWorker is the slim view of the fleetupdate.Worker this package
|
|
// uses. Kept here for forward compatibility with P6-15 — the host
|
|
// update endpoint itself does not use it.
|
|
type FleetWorker interface {
|
|
Start(ctx context.Context, userID, targetVersion string, hostIDs []string) (string, error)
|
|
Cancel(ctx context.Context, fleetUpdateID string) error
|
|
}
|
|
|
|
// dispatchHostUpdateResult communicates structured outcomes from the
|
|
// shared dispatch path so both the HTTP handler and the fleet worker
|
|
// can format errors in their own idiom.
|
|
type dispatchHostUpdateResult struct {
|
|
JobID string
|
|
Code string // "" on success
|
|
Status int // HTTP status the JSON handler should use on error
|
|
Msg string // human-readable detail (optional)
|
|
}
|
|
|
|
// dispatchHostUpdate is the shared "send command.update to one host"
|
|
// path. It performs every pre-check (host exists, online, version
|
|
// mismatch, no in-flight update) and on success creates the jobs row,
|
|
// audits, dispatches the WS envelope, and tracks the watcher entry.
|
|
//
|
|
// Pre-checks are returned as structured codes rather than HTTP errors
|
|
// so the fleet worker can map them onto its own per-host status enum
|
|
// without parsing strings.
|
|
func (s *Server) dispatchHostUpdate(ctx context.Context, hostID string, actorKind string, actorID *string) dispatchHostUpdateResult {
|
|
host, err := s.deps.Store.GetHost(ctx, hostID)
|
|
if err != nil || host == nil {
|
|
return dispatchHostUpdateResult{Code: "host_not_found", Status: stdhttp.StatusNotFound}
|
|
}
|
|
if !s.deps.Hub.Connected(host.ID) {
|
|
return dispatchHostUpdateResult{
|
|
Code: "host_offline", Status: stdhttp.StatusConflict,
|
|
Msg: "agent is not currently connected",
|
|
}
|
|
}
|
|
if host.AgentVersion != "" && host.AgentVersion == version.Version {
|
|
return dispatchHostUpdateResult{
|
|
Code: "already_up_to_date", Status: stdhttp.StatusConflict,
|
|
Msg: "agent already running version " + version.Version,
|
|
}
|
|
}
|
|
existing, err := s.deps.Store.RunningUpdateJobForHost(ctx, hostID)
|
|
if err != nil {
|
|
return dispatchHostUpdateResult{Code: "internal", Status: stdhttp.StatusInternalServerError, Msg: err.Error()}
|
|
}
|
|
if existing != "" {
|
|
return dispatchHostUpdateResult{
|
|
Code: "update_in_progress", Status: stdhttp.StatusConflict,
|
|
Msg: "an update job is already in flight for this host",
|
|
JobID: existing,
|
|
}
|
|
}
|
|
|
|
jobID := ulid.Make().String()
|
|
now := time.Now().UTC()
|
|
if err := s.deps.Store.CreateJob(ctx, store.Job{
|
|
ID: jobID, HostID: hostID, Kind: "update",
|
|
ActorKind: actorKind, ActorID: actorID,
|
|
CreatedAt: now,
|
|
}); err != nil {
|
|
return dispatchHostUpdateResult{Code: "internal", Status: stdhttp.StatusInternalServerError, Msg: err.Error()}
|
|
}
|
|
env, err := api.Marshal(api.MsgCommandUpdate, ulid.Make().String(), api.CommandUpdatePayload{
|
|
JobID: jobID,
|
|
})
|
|
if err != nil {
|
|
return dispatchHostUpdateResult{Code: "internal", Status: stdhttp.StatusInternalServerError, Msg: err.Error()}
|
|
}
|
|
if err := s.deps.Hub.Send(ctx, hostID, env); err != nil {
|
|
// Roll the job to failed so we don't leak a queued row.
|
|
_ = s.deps.Store.MarkJobFinished(ctx, jobID, "failed", -1, nil, err.Error(), time.Now().UTC())
|
|
return dispatchHostUpdateResult{
|
|
Code: "host_offline", Status: stdhttp.StatusConflict, Msg: err.Error(),
|
|
}
|
|
}
|
|
if s.deps.UpdateWatcher != nil {
|
|
s.deps.UpdateWatcher.Track(jobID, hostID)
|
|
}
|
|
|
|
auditPayload, _ := json.Marshal(map[string]string{
|
|
"job_id": jobID,
|
|
"target_version": version.Version,
|
|
})
|
|
_ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{
|
|
ID: ulid.Make().String(),
|
|
UserID: actorID,
|
|
Actor: actorKind,
|
|
Action: "host.update_dispatched",
|
|
TargetKind: ptr("host"),
|
|
TargetID: &hostID,
|
|
TS: now,
|
|
Payload: auditPayload,
|
|
})
|
|
|
|
return dispatchHostUpdateResult{JobID: jobID}
|
|
}
|
|
|
|
// handleHostUpdate is POST /api/hosts/{id}/update — JSON, admin-only.
|
|
func (s *Server) handleHostUpdate(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
|
user, ok := s.requireUser(r)
|
|
if !ok {
|
|
writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorised", "")
|
|
return
|
|
}
|
|
hostID := chi.URLParam(r, "id")
|
|
if hostID == "" {
|
|
writeJSONError(w, stdhttp.StatusBadRequest, "missing_host_id", "")
|
|
return
|
|
}
|
|
actor := "user"
|
|
var actorID *string
|
|
if user != nil {
|
|
actorID = &user.ID
|
|
}
|
|
res := s.dispatchHostUpdate(r.Context(), hostID, actor, actorID)
|
|
if res.Code != "" {
|
|
writeJSONError(w, res.Status, res.Code, res.Msg)
|
|
return
|
|
}
|
|
writeJSON(w, stdhttp.StatusAccepted, map[string]string{"job_id": res.JobID})
|
|
}
|
|
|
|
// handleHostUpdateForm is the HTMX-friendly POST /hosts/{id}/update
|
|
// variant. On success it sets HX-Redirect to the job detail page; on
|
|
// pre-check failures it renders an inline error banner.
|
|
func (s *Server) handleHostUpdateForm(w stdhttp.ResponseWriter, r *stdhttp.Request) {
|
|
user, ok := s.requireUser(r)
|
|
if !ok {
|
|
stdhttp.Error(w, "unauthorised", stdhttp.StatusUnauthorized)
|
|
return
|
|
}
|
|
hostID := chi.URLParam(r, "id")
|
|
if hostID == "" {
|
|
stdhttp.Error(w, "missing host_id", stdhttp.StatusBadRequest)
|
|
return
|
|
}
|
|
actor := "user"
|
|
var actorID *string
|
|
if user != nil {
|
|
actorID = &user.ID
|
|
}
|
|
res := s.dispatchHostUpdate(r.Context(), hostID, actor, actorID)
|
|
if res.Code != "" {
|
|
// Inline banner for HTMX swaps. Mirrors what host_credentials
|
|
// returns on validation errors — small text/html fragment.
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
w.WriteHeader(res.Status)
|
|
msg := hostUpdateErrorMessage(res.Code, res.Msg)
|
|
_, _ = w.Write([]byte(`<div class="banner banner-error" role="alert">` + htmlEscape(msg) + `</div>`))
|
|
return
|
|
}
|
|
w.Header().Set("HX-Redirect", "/jobs/"+res.JobID)
|
|
w.WriteHeader(stdhttp.StatusOK)
|
|
}
|
|
|
|
func hostUpdateErrorMessage(code, msg string) string {
|
|
switch code {
|
|
case "host_not_found":
|
|
return "Host not found."
|
|
case "host_offline":
|
|
return "Agent is offline; can't deliver the update command."
|
|
case "already_up_to_date":
|
|
return "Agent is already running the current version."
|
|
case "update_in_progress":
|
|
return "An update is already in progress for this host."
|
|
}
|
|
if msg != "" {
|
|
return msg
|
|
}
|
|
return "Update dispatch failed."
|
|
}
|
|
|
|
// htmlEscape is a minimal HTML-attr-safe escaper. Avoids pulling html/template
|
|
// for a one-shot inline banner.
|
|
func htmlEscape(s string) string {
|
|
out := make([]byte, 0, len(s))
|
|
for i := 0; i < len(s); i++ {
|
|
switch s[i] {
|
|
case '&':
|
|
out = append(out, []byte("&")...)
|
|
case '<':
|
|
out = append(out, []byte("<")...)
|
|
case '>':
|
|
out = append(out, []byte(">")...)
|
|
case '"':
|
|
out = append(out, []byte(""")...)
|
|
default:
|
|
out = append(out, s[i])
|
|
}
|
|
}
|
|
return string(out)
|
|
}
|