diff --git a/cmd/server/main.go b/cmd/server/main.go index a083a6d..bfd7900 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -175,8 +175,11 @@ func run() error { } case <-offlineTick.C: cutoff := time.Now().Add(-90 * time.Second) - if n, err := st.MarkHostsOfflineStale(ctx, cutoff); err == nil && n > 0 { - slog.Info("marked hosts offline (stale heartbeat)", "n", n) + if ids, err := st.MarkHostsOfflineStaleReturnIDs(ctx, cutoff); err == nil && len(ids) > 0 { + slog.Info("marked hosts offline (stale heartbeat)", "n", len(ids)) + // TODO(G1): notify engine once deps.AlertEngine is wired. + // for _, id := range ids { alertEngine.NotifyHostOffline(id) } + _ = ids } case <-pendingDrainTick.C: srv.DrainAllDue(ctx) diff --git a/internal/server/http/server.go b/internal/server/http/server.go index 3a20733..fe51489 100644 --- a/internal/server/http/server.go +++ b/internal/server/http/server.go @@ -13,6 +13,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" + "gitea.dcglab.co.uk/steve/restic-manager/internal/alert" "gitea.dcglab.co.uk/steve/restic-manager/internal/crypto" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/config" "gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui" @@ -29,6 +30,10 @@ type Deps struct { Hub *ws.Hub JobHub *ws.JobHub UI *ui.Renderer + // AlertEngine (optional, wired in G1) receives job-finished and + // host-online events from the WS handler. Nil until G1 constructs + // the engine at boot. + AlertEngine *alert.Engine // Version is the binary's build version, surfaced in the chrome. // Empty falls back to "dev". Version string @@ -225,6 +230,7 @@ func (s *Server) routes(r chi.Router) { Hub: s.deps.Hub, Store: s.deps.Store, JobHub: s.deps.JobHub, + AlertEngine: s.deps.AlertEngine, OnHello: s.onAgentHello, OnScheduleAck: s.applyScheduleAck, OnScheduleFire: s.dispatchScheduledJob, diff --git a/internal/server/ws/handler.go b/internal/server/ws/handler.go index b488095..4ef61ee 100644 --- a/internal/server/ws/handler.go +++ b/internal/server/ws/handler.go @@ -12,6 +12,7 @@ import ( "github.com/coder/websocket" + "gitea.dcglab.co.uk/steve/restic-manager/internal/alert" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/auth" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" @@ -22,6 +23,9 @@ type HandlerDeps struct { Hub *Hub Store *store.Store JobHub *JobHub + // AlertEngine receives job-finished and host-online events so the + // alert engine can evaluate its rules. Optional; nil = no-op. + AlertEngine *alert.Engine // OnHello is called once per successful hello, after the host row // has been touched and the conn registered. Used by the HTTP // layer to push host_credentials down as a config.update before @@ -140,6 +144,9 @@ func runAgentLoop(ctx context.Context, c *Conn, hostID string, deps HandlerDeps) helloPayload.ProtocolVersion, now); err != nil { slog.Error("ws mark host hello failed", "host_id", hostID, "err", err) } + if deps.AlertEngine != nil { + deps.AlertEngine.NotifyHostOnline(hostID) + } deps.Hub.Register(hostID, c) defer deps.Hub.Unregister(hostID, c) @@ -210,6 +217,17 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E if deps.JobHub != nil { deps.JobHub.Broadcast(p.JobID, env) } + if deps.AlertEngine != nil { + if job, err := deps.Store.GetJob(ctx, p.JobID); err == nil && job != nil { + deps.AlertEngine.NotifyJobFinished(alert.JobFinishedEvent{ + HostID: hostID, + JobID: p.JobID, + Kind: job.Kind, + Status: string(p.Status), + When: p.FinishedAt, + }) + } + } case api.MsgLogStream: var p api.LogStreamLine diff --git a/internal/store/hosts.go b/internal/store/hosts.go index 96f85aa..ffb1295 100644 --- a/internal/store/hosts.go +++ b/internal/store/hosts.go @@ -110,6 +110,55 @@ func (s *Store) MarkHostsOfflineStale(ctx context.Context, cutoff time.Time) (in return n, nil } +// MarkHostsOfflineStaleReturnIDs flips any host that hasn't been seen +// since before `cutoff` from 'online' to 'offline' and returns the IDs +// of every host that was flipped. Uses a single transaction. +func (s *Store) MarkHostsOfflineStaleReturnIDs(ctx context.Context, cutoff time.Time) ([]string, error) { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("store: begin tx: %w", err) + } + defer func() { _ = tx.Rollback() }() + + cutoffStr := cutoff.UTC().Format(time.RFC3339Nano) + rows, err := tx.QueryContext(ctx, + `SELECT id FROM hosts + WHERE status = 'online' + AND (last_seen_at IS NULL OR last_seen_at < ?)`, + cutoffStr) + if err != nil { + return nil, fmt.Errorf("store: select stale hosts: %w", err) + } + var ids []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + _ = rows.Close() + return nil, fmt.Errorf("store: scan stale host id: %w", err) + } + ids = append(ids, id) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("store: iterate stale hosts: %w", err) + } + _ = rows.Close() + + if len(ids) > 0 { + if _, err := tx.ExecContext(ctx, + `UPDATE hosts SET status = 'offline' + WHERE status = 'online' + AND (last_seen_at IS NULL OR last_seen_at < ?)`, + cutoffStr); err != nil { + return nil, fmt.Errorf("store: mark offline: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("store: commit: %w", err) + } + return ids, nil +} + // ListHosts returns every host. Phase 1 callers fit a small fleet in // memory; pagination lands when it matters. func (s *Store) ListHosts(ctx context.Context) ([]Host, error) {