alert: wire engine into ws hello + MarkJobFinished + offline sweep
- ws.HandlerDeps gains an AlertEngine *alert.Engine field; populated from http.Deps.AlertEngine (nil until G1 constructs the engine) - runAgentLoop calls NotifyHostOnline after MarkHostHello succeeds - dispatchAgentMessage MsgJobFinished case calls NotifyJobFinished, looking up the job Kind via Store.GetJob before notifying - store.MarkHostsOfflineStaleReturnIDs added: SELECT+UPDATE in one transaction, returns the IDs that flipped to offline - offline sweeper in cmd/server/main.go switched to the new variant; TODO(G1) comment marks where NotifyHostOffline calls will land
This commit is contained in:
+5
-2
@@ -175,8 +175,11 @@ func run() error {
|
|||||||
}
|
}
|
||||||
case <-offlineTick.C:
|
case <-offlineTick.C:
|
||||||
cutoff := time.Now().Add(-90 * time.Second)
|
cutoff := time.Now().Add(-90 * time.Second)
|
||||||
if n, err := st.MarkHostsOfflineStale(ctx, cutoff); err == nil && n > 0 {
|
if ids, err := st.MarkHostsOfflineStaleReturnIDs(ctx, cutoff); err == nil && len(ids) > 0 {
|
||||||
slog.Info("marked hosts offline (stale heartbeat)", "n", n)
|
slog.Info("marked hosts offline (stale heartbeat)", "n", len(ids))
|
||||||
|
// TODO(G1): notify engine once deps.AlertEngine is wired.
|
||||||
|
// for _, id := range ids { alertEngine.NotifyHostOffline(id) }
|
||||||
|
_ = ids
|
||||||
}
|
}
|
||||||
case <-pendingDrainTick.C:
|
case <-pendingDrainTick.C:
|
||||||
srv.DrainAllDue(ctx)
|
srv.DrainAllDue(ctx)
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/alert"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/crypto"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/config"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/config"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui"
|
||||||
@@ -29,6 +30,10 @@ type Deps struct {
|
|||||||
Hub *ws.Hub
|
Hub *ws.Hub
|
||||||
JobHub *ws.JobHub
|
JobHub *ws.JobHub
|
||||||
UI *ui.Renderer
|
UI *ui.Renderer
|
||||||
|
// AlertEngine (optional, wired in G1) receives job-finished and
|
||||||
|
// host-online events from the WS handler. Nil until G1 constructs
|
||||||
|
// the engine at boot.
|
||||||
|
AlertEngine *alert.Engine
|
||||||
// Version is the binary's build version, surfaced in the chrome.
|
// Version is the binary's build version, surfaced in the chrome.
|
||||||
// Empty falls back to "dev".
|
// Empty falls back to "dev".
|
||||||
Version string
|
Version string
|
||||||
@@ -225,6 +230,7 @@ func (s *Server) routes(r chi.Router) {
|
|||||||
Hub: s.deps.Hub,
|
Hub: s.deps.Hub,
|
||||||
Store: s.deps.Store,
|
Store: s.deps.Store,
|
||||||
JobHub: s.deps.JobHub,
|
JobHub: s.deps.JobHub,
|
||||||
|
AlertEngine: s.deps.AlertEngine,
|
||||||
OnHello: s.onAgentHello,
|
OnHello: s.onAgentHello,
|
||||||
OnScheduleAck: s.applyScheduleAck,
|
OnScheduleAck: s.applyScheduleAck,
|
||||||
OnScheduleFire: s.dispatchScheduledJob,
|
OnScheduleFire: s.dispatchScheduledJob,
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
|
|
||||||
"github.com/coder/websocket"
|
"github.com/coder/websocket"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/alert"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/auth"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/auth"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
@@ -22,6 +23,9 @@ type HandlerDeps struct {
|
|||||||
Hub *Hub
|
Hub *Hub
|
||||||
Store *store.Store
|
Store *store.Store
|
||||||
JobHub *JobHub
|
JobHub *JobHub
|
||||||
|
// AlertEngine receives job-finished and host-online events so the
|
||||||
|
// alert engine can evaluate its rules. Optional; nil = no-op.
|
||||||
|
AlertEngine *alert.Engine
|
||||||
// OnHello is called once per successful hello, after the host row
|
// OnHello is called once per successful hello, after the host row
|
||||||
// has been touched and the conn registered. Used by the HTTP
|
// has been touched and the conn registered. Used by the HTTP
|
||||||
// layer to push host_credentials down as a config.update before
|
// layer to push host_credentials down as a config.update before
|
||||||
@@ -140,6 +144,9 @@ func runAgentLoop(ctx context.Context, c *Conn, hostID string, deps HandlerDeps)
|
|||||||
helloPayload.ProtocolVersion, now); err != nil {
|
helloPayload.ProtocolVersion, now); err != nil {
|
||||||
slog.Error("ws mark host hello failed", "host_id", hostID, "err", err)
|
slog.Error("ws mark host hello failed", "host_id", hostID, "err", err)
|
||||||
}
|
}
|
||||||
|
if deps.AlertEngine != nil {
|
||||||
|
deps.AlertEngine.NotifyHostOnline(hostID)
|
||||||
|
}
|
||||||
|
|
||||||
deps.Hub.Register(hostID, c)
|
deps.Hub.Register(hostID, c)
|
||||||
defer deps.Hub.Unregister(hostID, c)
|
defer deps.Hub.Unregister(hostID, c)
|
||||||
@@ -210,6 +217,17 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E
|
|||||||
if deps.JobHub != nil {
|
if deps.JobHub != nil {
|
||||||
deps.JobHub.Broadcast(p.JobID, env)
|
deps.JobHub.Broadcast(p.JobID, env)
|
||||||
}
|
}
|
||||||
|
if deps.AlertEngine != nil {
|
||||||
|
if job, err := deps.Store.GetJob(ctx, p.JobID); err == nil && job != nil {
|
||||||
|
deps.AlertEngine.NotifyJobFinished(alert.JobFinishedEvent{
|
||||||
|
HostID: hostID,
|
||||||
|
JobID: p.JobID,
|
||||||
|
Kind: job.Kind,
|
||||||
|
Status: string(p.Status),
|
||||||
|
When: p.FinishedAt,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
case api.MsgLogStream:
|
case api.MsgLogStream:
|
||||||
var p api.LogStreamLine
|
var p api.LogStreamLine
|
||||||
|
|||||||
@@ -110,6 +110,55 @@ func (s *Store) MarkHostsOfflineStale(ctx context.Context, cutoff time.Time) (in
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MarkHostsOfflineStaleReturnIDs flips any host that hasn't been seen
|
||||||
|
// since before `cutoff` from 'online' to 'offline' and returns the IDs
|
||||||
|
// of every host that was flipped. Uses a single transaction.
|
||||||
|
func (s *Store) MarkHostsOfflineStaleReturnIDs(ctx context.Context, cutoff time.Time) ([]string, error) {
|
||||||
|
tx, err := s.db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: begin tx: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
|
cutoffStr := cutoff.UTC().Format(time.RFC3339Nano)
|
||||||
|
rows, err := tx.QueryContext(ctx,
|
||||||
|
`SELECT id FROM hosts
|
||||||
|
WHERE status = 'online'
|
||||||
|
AND (last_seen_at IS NULL OR last_seen_at < ?)`,
|
||||||
|
cutoffStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: select stale hosts: %w", err)
|
||||||
|
}
|
||||||
|
var ids []string
|
||||||
|
for rows.Next() {
|
||||||
|
var id string
|
||||||
|
if err := rows.Scan(&id); err != nil {
|
||||||
|
_ = rows.Close()
|
||||||
|
return nil, fmt.Errorf("store: scan stale host id: %w", err)
|
||||||
|
}
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("store: iterate stale hosts: %w", err)
|
||||||
|
}
|
||||||
|
_ = rows.Close()
|
||||||
|
|
||||||
|
if len(ids) > 0 {
|
||||||
|
if _, err := tx.ExecContext(ctx,
|
||||||
|
`UPDATE hosts SET status = 'offline'
|
||||||
|
WHERE status = 'online'
|
||||||
|
AND (last_seen_at IS NULL OR last_seen_at < ?)`,
|
||||||
|
cutoffStr); err != nil {
|
||||||
|
return nil, fmt.Errorf("store: mark offline: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return nil, fmt.Errorf("store: commit: %w", err)
|
||||||
|
}
|
||||||
|
return ids, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ListHosts returns every host. Phase 1 callers fit a small fleet in
|
// ListHosts returns every host. Phase 1 callers fit a small fleet in
|
||||||
// memory; pagination lands when it matters.
|
// memory; pagination lands when it matters.
|
||||||
func (s *Store) ListHosts(ctx context.Context) ([]Host, error) {
|
func (s *Store) ListHosts(ctx context.Context) ([]Host, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user