P1-26: live job log viewer + WS browser fan-out hub

Closes the P1-21 remainder.

internal/server/ws/jobhub.go — new JobHub. Per-job_id set of
subscribers; each gets a 64-deep buffered channel with a writer
goroutine. Broadcast is non-blocking: if a subscriber is slow,
its channel fills and messages are dropped for that subscriber
only — the agent's read loop is never blocked by a stuck browser.

The agent dispatchAgentMessage path mirrors job.started /
job.progress / log.stream / job.finished envelopes onto the hub
in addition to its existing persistence work. The wire shape is
the same end-to-end, so client-side JS switches on env.type the
same way Go code does.

GET /api/jobs/{id}/stream is the browser endpoint. Auth via
session cookie (HTTP layer); upgrade; subscribe; pump until
context closes.

GET /jobs/{id} renders the live log page. Three states (queued/
running/succeeded/failed) drive the header pill, the progress
bar block, the failure summary panel, and the action button
(Cancel job while running, Back to host afterwards). Already-
persisted log lines are server-rendered on initial load; new
lines arrive over the WS and append to #log-stream. Auto-scrolls
unless the user scrolls up (a "⇢ Follow" pill re-attaches).
On job.finished the page reloads after 600ms to pick up the
final-state header rendered server-side.

POST /hosts/{id}/run-backup now sets HX-Redirect → /jobs/{job_id}
on success so HTMX lands the operator straight on the live log.
For non-HTMX callers (curl / plain form post) it 303s to the
same target.

store.ListJobLogs returns persisted log lines for initial render
on page load.

Browser-verified end-to-end: enrol → run a real backup against a
sibling restic/rest-server → live progress + 11 log lines stream
in → succeeded pill + final stats land after page reload.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-01 21:45:56 +01:00
parent cc9dcff816
commit e6729a5a3d
10 changed files with 597 additions and 20 deletions
+16 -5
View File
@@ -22,11 +22,12 @@ import (
// Deps bundles every collaborator the HTTP server depends on. Wired up
// in cmd/server; tests pass a pared-down Deps with fakes.
type Deps struct {
Cfg config.Config
Store *store.Store
AEAD *crypto.AEAD
Hub *ws.Hub
UI *ui.Renderer
Cfg config.Config
Store *store.Store
AEAD *crypto.AEAD
Hub *ws.Hub
JobHub *ws.JobHub
UI *ui.Renderer
// Version is the binary's build version, surfaced in the chrome.
// Empty falls back to "dev".
Version string
@@ -110,6 +111,7 @@ func (s *Server) routes(r chi.Router) {
r.Mount("/ws/agent", ws.AgentHandler(ws.HandlerDeps{
Hub: s.deps.Hub,
Store: s.deps.Store,
JobHub: s.deps.JobHub,
OnHello: s.onAgentHello,
}))
}
@@ -138,6 +140,15 @@ func (s *Server) routes(r chi.Router) {
r.Post("/hosts/new", s.handleUIAddHostPost)
// Host detail (Snapshots tab is the default).
r.Get("/hosts/{id}", s.handleUIHostDetail)
// Live job log.
r.Get("/jobs/{id}", s.handleUIJobDetail)
}
// Browser job-log stream (separate from /ws/agent so the auth
// layer is session-cookie not bearer). Mounted regardless of
// whether the UI is up — JSON callers may also subscribe.
if s.deps.JobHub != nil {
r.Get("/api/jobs/{id}/stream", s.handleJobStream)
}
}
+118 -6
View File
@@ -8,11 +8,13 @@ import (
"strings"
"time"
"github.com/coder/websocket"
"github.com/go-chi/chi/v5"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/auth"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ui"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
"gitea.dcglab.co.uk/steve/restic-manager/web"
)
@@ -138,10 +140,10 @@ func (s *Server) handleUIDashboard(w stdhttp.ResponseWriter, r *stdhttp.Request)
}
// handleUIRunBackup is the form-submit twin of POST /api/hosts/{id}/jobs
// that the dashboard's "Run now" buttons call via hx-post. Returns
// 204 on success — HTMX swap=none means "did the thing, no DOM
// change needed." Failures return text in the body so HTMX's
// response-header inspection surfaces it.
// that the dashboard / host-detail "Run now" buttons call via
// hx-post. On success it sets HX-Redirect → /jobs/{job_id} so the
// operator lands on the live log viewer for the job they just
// kicked off.
func (s *Server) handleUIRunBackup(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r)
if u == nil {
@@ -157,12 +159,23 @@ func (s *Server) handleUIRunBackup(w stdhttp.ResponseWriter, r *stdhttp.Request)
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
_, status, code, msg := s.dispatchJob(r.Context(), storeUser, hostID, api.JobBackup, nil)
res, status, code, msg := s.dispatchJob(r.Context(), storeUser, hostID, api.JobBackup, nil)
if code != "" {
stdhttp.Error(w, msg, status)
return
}
w.WriteHeader(stdhttp.StatusNoContent)
// HTMX (with hx-post + hx-swap=none) doesn't honour HX-Redirect
// when the response itself is a 3xx — fetch follows the redirect
// first and the header is lost. Branch on the HX-Request marker
// so HTMX gets a 200 + HX-Redirect (client-side window.location
// hop), while plain form-post / curl callers get the 303.
target := "/jobs/" + res.JobID
if r.Header.Get("HX-Request") == "true" {
w.Header().Set("HX-Redirect", target)
w.WriteHeader(stdhttp.StatusOK)
return
}
stdhttp.Redirect(w, r, target, stdhttp.StatusSeeOther)
}
// addHostPage carries the form state into the Add host template.
@@ -332,6 +345,105 @@ func (s *Server) publicURL(r *stdhttp.Request) string {
return scheme + "://" + r.Host
}
// jobDetailPage carries everything the live-log template renders.
type jobDetailPage struct {
Job store.Job
Host store.Host
Logs []store.JobLogLine
NextSeq int64
IsActive bool // true while status is queued|running
}
// handleUIJobDetail renders the live job log view (snapshot of any
// already-persisted log lines + an empty stream container the JS
// fills via the WS).
func (s *Server) handleUIJobDetail(w stdhttp.ResponseWriter, r *stdhttp.Request) {
u := s.requireUIUser(w, r)
if u == nil {
return
}
jobID := chi.URLParam(r, "id")
if jobID == "" {
stdhttp.NotFound(w, r)
return
}
job, err := s.deps.Store.GetJob(r.Context(), jobID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
stdhttp.NotFound(w, r)
return
}
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
host, err := s.deps.Store.GetHost(r.Context(), job.HostID)
if err != nil {
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
logs, err := s.deps.Store.ListJobLogs(r.Context(), jobID, 0, 0)
if err != nil {
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
return
}
var nextSeq int64
if n := len(logs); n > 0 {
nextSeq = logs[n-1].Seq
}
view := s.baseView(u, "dashboard")
view.Title = job.Kind + " · " + host.Name + " · restic-manager"
view.Page = jobDetailPage{
Job: *job,
Host: *host,
Logs: logs,
NextSeq: nextSeq,
IsActive: job.Status == "queued" || job.Status == "running",
}
if err := s.deps.UI.Render(w, "job_detail", view); err != nil {
slog.Error("ui: render job_detail", "err", err)
stdhttp.Error(w, "internal", stdhttp.StatusInternalServerError)
}
}
// handleJobStream is the browser-side WS endpoint. Auth is via the
// session cookie (the HTTP layer does the lookup before upgrading).
// On connect we subscribe to JobHub for the given job_id; the
// subscriber goroutine pumps fan-out messages to the client until
// the job finishes or the browser navigates away.
//
// Messages on the wire are the same api.Envelope shape as on the
// agent side, so the client-side JS can switch on env.type the
// same way our Go code does.
func (s *Server) handleJobStream(w stdhttp.ResponseWriter, r *stdhttp.Request) {
if u, _ := s.sessionUser(r); u == nil {
stdhttp.Error(w, "unauthorized", stdhttp.StatusUnauthorized)
return
}
jobID := chi.URLParam(r, "id")
if jobID == "" {
stdhttp.Error(w, "missing job id", stdhttp.StatusBadRequest)
return
}
if _, err := s.deps.Store.GetJob(r.Context(), jobID); err != nil {
stdhttp.NotFound(w, r)
return
}
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true, // Origin checks pointless for a same-origin browser hop.
})
if err != nil {
slog.Warn("ws browser accept failed", "job_id", jobID, "err", err)
return
}
defer func() { _ = conn.Close(websocket.StatusNormalClosure, "") }()
// Wrap so we get the same Send semantics as the agent path.
c := ws.NewConn("browser-"+jobID, conn)
s.deps.JobHub.Subscribe(r.Context(), jobID, c)
}
// userByID fetches the full store.User the UI session represents.
// Returns the user, ok-flag, error. Used by handlers that need the
// store-side row (e.g. for audit_log.user_id) rather than just the
+8 -1
View File
@@ -18,13 +18,20 @@ func funcMap() template.FuncMap {
"comma": formatComma,
"deref": derefStr,
"timeNotZero": func(t *time.Time) bool { return t != nil && !t.IsZero() },
"joinDot": func(parts []string) string { return strings.Join(parts, " · ") },
"joinDot": func(parts []string) string { return strings.Join(parts, " · ") },
"absTime": func(t time.Time) string {
if t.IsZero() {
return "—"
}
return t.Format("2006-01-02 15:04:05")
},
"derefInt": func(p *int) int {
if p == nil {
return 0
}
return *p
},
"sub": func(a, b int) int { return a - b },
}
}
+21 -6
View File
@@ -19,8 +19,9 @@ import (
// HandlerDeps is the set of collaborators the agent WS handler needs.
type HandlerDeps struct {
Hub *Hub
Store *store.Store
Hub *Hub
Store *store.Store
JobHub *JobHub
// OnHello is called once per successful hello, after the host row
// has been touched and the conn registered. Used by the HTTP
// layer to push host_credentials down as a config.update before
@@ -172,12 +173,20 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E
if err := deps.Store.MarkJobStarted(ctx, p.JobID, p.StartedAt); err != nil {
slog.Warn("ws: mark job started", "job_id", p.JobID, "err", err)
}
if deps.JobHub != nil {
deps.JobHub.Broadcast(p.JobID, env)
}
case api.MsgJobProgress:
// We don't persist every progress tick; the live UI subscribes
// to a fan-out channel that lands with P1-21 / the UI work.
// TODO: implement the ws fan-out hub for browsers.
_ = env
// Progress ticks aren't persisted (1Hz × every job × every
// path-walk would dwarf the rest of the DB). The live UI
// subscribes to JobHub and gets them in real time; once a
// job finishes the final summary lands via job.finished.
var p api.JobProgressPayload
_ = env.UnmarshalPayload(&p)
if deps.JobHub != nil {
deps.JobHub.Broadcast(p.JobID, env)
}
case api.MsgJobFinished:
var p api.JobFinishedPayload
@@ -187,6 +196,9 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E
string(p.Status), p.ExitCode, p.Stats, errMsg, p.FinishedAt); err != nil {
slog.Warn("ws: mark job finished", "job_id", p.JobID, "err", err)
}
if deps.JobHub != nil {
deps.JobHub.Broadcast(p.JobID, env)
}
case api.MsgLogStream:
var p api.LogStreamLine
@@ -195,6 +207,9 @@ func dispatchAgentMessage(ctx context.Context, c *Conn, hostID string, env api.E
string(p.Stream), p.Payload); err != nil {
slog.Warn("ws: append job log", "job_id", p.JobID, "err", err)
}
if deps.JobHub != nil {
deps.JobHub.Broadcast(p.JobID, env)
}
case api.MsgSnapshotsRpt:
var p api.SnapshotsReportPayload
+126
View File
@@ -0,0 +1,126 @@
package ws
import (
"context"
"log/slog"
"sync"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
)
// JobHub fans agent-emitted job messages (job.progress, log.stream,
// job.started, job.finished) out to every browser currently watching
// the matching job_id over /api/jobs/{id}/stream.
//
// Decoupled from the agent Hub: many subscribers per job_id, all
// read-only, lifecycle tied to the browser WS rather than the agent's.
type JobHub struct {
mu sync.RWMutex
subs map[string]map[*subscriber]struct{} // job_id → set
}
// NewJobHub returns an empty hub.
func NewJobHub() *JobHub {
return &JobHub{subs: make(map[string]map[*subscriber]struct{})}
}
// subscriber is one browser WS subscription. Each gets its own
// buffered channel + writer goroutine so a slow client can't block
// the broadcaster (or, transitively, the agent's read loop).
type subscriber struct {
jobID string
ch chan api.Envelope
}
// Subscribe registers a new subscriber for jobID. Run pumps messages
// from the subscriber's channel onto conn until ctx is cancelled or
// conn dies; it returns when one of those happens. Caller is
// expected to call this from the goroutine that owns conn.
//
// If the subscriber's send channel fills, broadcasts drop messages
// for that subscriber rather than blocking. The browser will see a
// gap; on completion the page can re-fetch persisted log_lines to
// reconcile.
func (h *JobHub) Subscribe(ctx context.Context, jobID string, conn *Conn) {
const buf = 64
s := &subscriber{jobID: jobID, ch: make(chan api.Envelope, buf)}
h.mu.Lock()
if h.subs[jobID] == nil {
h.subs[jobID] = make(map[*subscriber]struct{})
}
h.subs[jobID][s] = struct{}{}
h.mu.Unlock()
defer func() {
h.mu.Lock()
if set, ok := h.subs[jobID]; ok {
delete(set, s)
if len(set) == 0 {
delete(h.subs, jobID)
}
}
h.mu.Unlock()
}()
// Drain pump.
for {
select {
case <-ctx.Done():
return
case env, ok := <-s.ch:
if !ok {
return
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := conn.Send(sendCtx, env)
cancel()
if err != nil {
slog.Info("ws browser send failed; closing subscriber", "job_id", jobID, "err", err)
return
}
}
}
}
// Broadcast sends env to every subscriber for jobID. Non-blocking:
// if a subscriber's buffer is full, the message is dropped for that
// subscriber and a warning is logged. Other subscribers are
// unaffected.
//
// Safe to call from any goroutine; holds an RLock briefly to snapshot
// the subscriber set, then releases before sending.
func (h *JobHub) Broadcast(jobID string, env api.Envelope) {
h.mu.RLock()
set := h.subs[jobID]
if len(set) == 0 {
h.mu.RUnlock()
return
}
targets := make([]*subscriber, 0, len(set))
for s := range set {
targets = append(targets, s)
}
h.mu.RUnlock()
for _, s := range targets {
select {
case s.ch <- env:
default:
// Buffer full — drop. Logged once per drop; a flood means
// the browser is genuinely stuck, not just slow.
slog.Warn("ws browser sub: send buffer full, dropping message",
"job_id", jobID, "type", env.Type)
}
}
}
// SubscriberCount returns the number of browsers currently watching
// jobID. Used for diagnostics / future "this many people are
// watching" counters.
func (h *JobHub) SubscriberCount(jobID string) int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.subs[jobID])
}
+42
View File
@@ -91,6 +91,48 @@ func (s *Store) AppendJobLog(ctx context.Context, jobID string, seq int64, ts ti
return nil
}
// JobLogLine is one persisted log line, ready to render.
type JobLogLine struct {
Seq int64
TS time.Time
Stream string // stdout|stderr|event
Payload string
}
// ListJobLogs returns persisted log lines for a job in seq order.
// afterSeq lets pagers / reconnect-resuming clients fetch only the
// tail; passing 0 returns from the beginning. limit caps the result
// (0 means no cap).
func (s *Store) ListJobLogs(ctx context.Context, jobID string, afterSeq int64, limit int) ([]JobLogLine, error) {
q := `SELECT seq, ts, stream, payload FROM job_logs
WHERE job_id = ? AND seq > ? ORDER BY seq ASC`
args := []any{jobID, afterSeq}
if limit > 0 {
q += ` LIMIT ?`
args = append(args, limit)
}
rows, err := s.db.QueryContext(ctx, q, args...)
if err != nil {
return nil, fmt.Errorf("store: list job logs: %w", err)
}
defer rows.Close()
var out []JobLogLine
for rows.Next() {
var l JobLogLine
var ts string
if err := rows.Scan(&l.Seq, &ts, &l.Stream, &l.Payload); err != nil {
return nil, fmt.Errorf("store: scan job log: %w", err)
}
t, perr := time.Parse(time.RFC3339Nano, ts)
if perr != nil {
return nil, fmt.Errorf("store: parse job log ts: %w", perr)
}
l.TS = t
out = append(out, l)
}
return out, rows.Err()
}
// GetJob returns a job row.
func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) {
row := s.db.QueryRowContext(ctx,