185 lines
5.4 KiB
Go
185 lines
5.4 KiB
Go
package ws
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
|
)
|
|
|
|
// updateTimeout bounds how long the watcher waits for an agent to come
|
|
// back with its new version after a command.update dispatch. var (not
|
|
// const) so tests can shrink it.
|
|
var updateTimeout = 90 * time.Second
|
|
|
|
// AlertRaiser is the slim subset of *alert.Engine the update watcher
|
|
// touches. Defined here (not in the alert package) so the dependency
|
|
// arrow points the right way.
|
|
type AlertRaiser interface {
|
|
RaiseUpdateFailed(ctx context.Context, hostID, jobID, reason string, when time.Time)
|
|
ResolveUpdateFailed(ctx context.Context, hostID string, when time.Time)
|
|
}
|
|
|
|
// UpdateWatcher tracks in-flight agent-update dispatches and reconciles
|
|
// them against incoming hello envelopes. Entries land on Track and
|
|
// resolve via OnHello (success path) or the periodic sweep (timeout).
|
|
type UpdateWatcher struct {
|
|
store *store.Store
|
|
alerts AlertRaiser
|
|
jobHub *JobHub // optional — if nil, no fan-out to browser streams
|
|
|
|
mu sync.Mutex
|
|
entries map[string]*updateEntry // hostID → entry
|
|
|
|
tickPeriod time.Duration
|
|
}
|
|
|
|
type updateEntry struct {
|
|
jobID string
|
|
startedAt time.Time
|
|
// terminated is set once the entry has reached a terminal state so
|
|
// late OnHellos don't resurrect it.
|
|
terminated bool
|
|
}
|
|
|
|
// NewUpdateWatcher builds an unstarted watcher. Call Run in a goroutine
|
|
// to start the periodic sweep.
|
|
func NewUpdateWatcher(st *store.Store, alerts AlertRaiser, jobHub *JobHub) *UpdateWatcher {
|
|
return &UpdateWatcher{
|
|
store: st,
|
|
alerts: alerts,
|
|
jobHub: jobHub,
|
|
entries: make(map[string]*updateEntry),
|
|
tickPeriod: 5 * time.Second,
|
|
}
|
|
}
|
|
|
|
// Track registers a freshly-dispatched update job. A subsequent Track
|
|
// for the same host replaces the prior entry (last-write-wins).
|
|
func (w *UpdateWatcher) Track(jobID, hostID string) {
|
|
if w == nil {
|
|
return
|
|
}
|
|
w.mu.Lock()
|
|
w.entries[hostID] = &updateEntry{jobID: jobID, startedAt: time.Now()}
|
|
w.mu.Unlock()
|
|
}
|
|
|
|
// OnHello is called by the WS handler after a successful hello has been
|
|
// persisted. If a tracked update for the host matches the targetVersion,
|
|
// the job is marked succeeded and any open update_failed alert is
|
|
// auto-resolved. A non-matching version is a no-op (the watcher keeps
|
|
// waiting until the timeout).
|
|
func (w *UpdateWatcher) OnHello(ctx context.Context, hostID, agentVersion, targetVersion string) {
|
|
if w == nil {
|
|
return
|
|
}
|
|
w.mu.Lock()
|
|
e, ok := w.entries[hostID]
|
|
if !ok || e.terminated {
|
|
w.mu.Unlock()
|
|
return
|
|
}
|
|
if agentVersion != targetVersion {
|
|
// Not the version we asked for — keep waiting.
|
|
w.mu.Unlock()
|
|
return
|
|
}
|
|
e.terminated = true
|
|
jobID := e.jobID
|
|
delete(w.entries, hostID)
|
|
w.mu.Unlock()
|
|
|
|
now := time.Now().UTC()
|
|
if err := w.store.MarkJobFinished(ctx, jobID, "succeeded", 0, nil, "", now); err != nil {
|
|
slog.Warn("ws update watcher: mark succeeded", "job_id", jobID, "host_id", hostID, "err", err)
|
|
}
|
|
w.publishJobFinished(jobID, api.JobSucceeded, 0, "", now)
|
|
if w.alerts != nil {
|
|
w.alerts.ResolveUpdateFailed(ctx, hostID, now)
|
|
}
|
|
}
|
|
|
|
// Run drives the periodic sweep. Returns when ctx is done.
|
|
func (w *UpdateWatcher) Run(ctx context.Context) {
|
|
if w == nil {
|
|
return
|
|
}
|
|
t := time.NewTicker(w.tickPeriod)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case now := <-t.C:
|
|
w.sweep(ctx, now)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *UpdateWatcher) sweep(ctx context.Context, now time.Time) {
|
|
type expired struct {
|
|
hostID string
|
|
jobID string
|
|
age time.Duration
|
|
}
|
|
var toFail []expired
|
|
w.mu.Lock()
|
|
for hostID, e := range w.entries {
|
|
if e.terminated {
|
|
continue
|
|
}
|
|
if now.Sub(e.startedAt) >= updateTimeout {
|
|
toFail = append(toFail, expired{hostID: hostID, jobID: e.jobID, age: now.Sub(e.startedAt)})
|
|
e.terminated = true
|
|
delete(w.entries, hostID)
|
|
}
|
|
}
|
|
w.mu.Unlock()
|
|
|
|
for _, x := range toFail {
|
|
reason := fmt.Sprintf("timeout: agent did not reconnect within %s", updateTimeout)
|
|
stamp := now.UTC()
|
|
errMsg := reason
|
|
if err := w.store.MarkJobFinished(ctx, x.jobID, "failed", -1, nil, errMsg, stamp); err != nil {
|
|
slog.Warn("ws update watcher: mark failed", "job_id", x.jobID, "host_id", x.hostID, "err", err)
|
|
}
|
|
w.publishJobFinished(x.jobID, api.JobFailed, -1, errMsg, stamp)
|
|
if w.alerts != nil {
|
|
w.alerts.RaiseUpdateFailed(ctx, x.hostID, x.jobID, reason, stamp)
|
|
}
|
|
}
|
|
}
|
|
|
|
// publishJobFinished pushes a synthetic job.finished envelope into the
|
|
// JobHub so any browser still streaming this job sees it terminate.
|
|
// The agent itself exits before it can send job.finished (it has to —
|
|
// it's about to relaunch into the new binary), so without this fan-out
|
|
// the /jobs/{id} page hangs until reload.
|
|
//
|
|
// Best-effort: if the hub is nil or the envelope can't be marshalled
|
|
// we log and move on — the DB-side state is already correct, this is
|
|
// purely a UI wake-up.
|
|
func (w *UpdateWatcher) publishJobFinished(jobID string, status api.JobStatus, exitCode int, errMsg string, finishedAt time.Time) {
|
|
if w.jobHub == nil {
|
|
return
|
|
}
|
|
payload := api.JobFinishedPayload{
|
|
JobID: jobID,
|
|
Status: status,
|
|
ExitCode: exitCode,
|
|
FinishedAt: finishedAt,
|
|
Error: errMsg,
|
|
}
|
|
env, err := api.Marshal(api.MsgJobFinished, "", payload)
|
|
if err != nil {
|
|
slog.Warn("ws update watcher: marshal synthetic job.finished", "job_id", jobID, "err", err)
|
|
return
|
|
}
|
|
w.jobHub.Broadcast(jobID, env)
|
|
}
|