Files
restic-manager/internal/server/ws/jobhub.go
T
steve b6f8de1dcc lint: drive baseline to zero, drop only-new-issues gate
Cleanup pass over the repo so CI can enforce lint going forward
without the only-new-issues escape hatch:

* gofumpt -w across the tree (31 hits, all formatting)
* misspell --fix (25 hits, US-locale spelling) — but reverted on
  api.JobCancelled = "cancelled" since that literal is the wire +
  DB CHECK constraint value, plus matched the case in store/fleet.go
  back to "cancelled" and added //nolint:misspell on both for the
  next time someone reaches for the auto-fix
* Wrap every `defer rows.Close()` / `defer stmt.Close()` /
  `defer res.Body.Close()` in `defer func() { _ = .Close() }()`
  to satisfy errcheck without losing the close itself
* websocket.Dial callers (1 prod, 4 tests) now capture + close the
  upgrade response Body — coder/websocket can return res with a nil
  Body on success, so the test deferred-closes guard against that
* Annotate the two genuine-by-design nilerr cases with //nolint
  comments explaining why nil-on-error is the contract (cookie
  missing = no session; ctx cancelled mid-backoff = clean shutdown)
* Add brief godoc on the 10 exported const groups + types that
  revive flagged (api.HostOS/HostArch/JobKind/JobStatus/LogStream/
  ErrorCode, restic.EventKind, store.Role, web.FS)
* Drop the unused (*Server).userByID method
* Inline the unparam baseView(active) — every UI page is under
  the dashboard primary nav today

Result: `golangci-lint run ./...` reports 0 issues. CI lint job
no longer needs only-new-issues: true; X-06 follow-up entry in
tasks.md removed.
2026-05-03 16:15:17 +01:00

146 lines
4.1 KiB
Go

package ws
import (
"context"
"log/slog"
"sync"
"time"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
)
// JobHub fans agent-emitted job messages (job.progress, log.stream,
// job.started, job.finished) out to every browser currently watching
// the matching job_id over /api/jobs/{id}/stream.
//
// Decoupled from the agent Hub: many subscribers per job_id, all
// read-only, lifecycle tied to the browser WS rather than the agent's.
type JobHub struct {
mu sync.RWMutex
subs map[string]map[*Subscriber]struct{} // job_id → set
}
// NewJobHub returns an empty hub.
func NewJobHub() *JobHub {
return &JobHub{subs: make(map[string]map[*Subscriber]struct{})}
}
// Subscriber is one browser WS subscription. Each gets its own
// buffered channel so a slow client can't block the broadcaster (or,
// transitively, the agent's read loop).
//
// Two-phase usage: Register() returns a Subscriber that's already in
// the hub's set (so concurrent Broadcasts will reach it), but no
// pump goroutine runs yet. The caller can prime the channel via Send
// — useful for late-subscriber catch-up — and then call Run to start
// the pump. Run blocks until ctx is canceled or conn dies, and
// unregisters on return.
type Subscriber struct {
hub *JobHub
jobID string
ch chan api.Envelope
}
// Register adds a subscriber for jobID and returns it. The caller
// MUST call Run to pump messages — until then the subscriber's
// channel buffers silently (up to its capacity, then drops).
//
// Use Register + Send + Run when you need to prime the channel from
// the calling goroutine before the pump starts (e.g. to send a
// synthetic job.finished to a late subscriber whose target job is
// already terminal). For the simple case use Subscribe.
func (h *JobHub) Register(jobID string) *Subscriber {
const buf = 64
s := &Subscriber{hub: h, jobID: jobID, ch: make(chan api.Envelope, buf)}
h.mu.Lock()
if h.subs[jobID] == nil {
h.subs[jobID] = make(map[*Subscriber]struct{})
}
h.subs[jobID][s] = struct{}{}
h.mu.Unlock()
return s
}
// Send pushes env onto the subscriber's channel. Non-blocking: if the
// buffer is full, the message is dropped and a warning is logged.
func (s *Subscriber) Send(env api.Envelope) {
select {
case s.ch <- env:
default:
slog.Warn("ws browser sub: send buffer full, dropping message",
"job_id", s.jobID, "type", env.Type)
}
}
// Run pumps messages from the subscriber's channel onto conn until
// ctx is canceled or conn dies. Unregisters on return. Caller is
// expected to invoke this from the goroutine that owns conn.
func (s *Subscriber) Run(ctx context.Context, conn *Conn) {
defer s.unregister()
for {
select {
case <-ctx.Done():
return
case env, ok := <-s.ch:
if !ok {
return
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := conn.Send(sendCtx, env)
cancel()
if err != nil {
slog.Info("ws browser send failed; closing subscriber",
"job_id", s.jobID, "err", err)
return
}
}
}
}
func (s *Subscriber) unregister() {
s.hub.mu.Lock()
if set, ok := s.hub.subs[s.jobID]; ok {
delete(set, s)
if len(set) == 0 {
delete(s.hub.subs, s.jobID)
}
}
s.hub.mu.Unlock()
}
// Subscribe is a one-call convenience for callers that don't need to
// prime the channel before the pump. Equivalent to Register + Run.
func (h *JobHub) Subscribe(ctx context.Context, jobID string, conn *Conn) {
s := h.Register(jobID)
s.Run(ctx, conn)
}
// Broadcast sends env to every subscriber for jobID. Non-blocking:
// if a subscriber's buffer is full, the message is dropped for that
// subscriber. Other subscribers are unaffected.
func (h *JobHub) Broadcast(jobID string, env api.Envelope) {
h.mu.RLock()
set := h.subs[jobID]
if len(set) == 0 {
h.mu.RUnlock()
return
}
targets := make([]*Subscriber, 0, len(set))
for s := range set {
targets = append(targets, s)
}
h.mu.RUnlock()
for _, s := range targets {
s.Send(env)
}
}
// SubscriberCount returns the number of browsers currently watching
// jobID.
func (h *JobHub) SubscriberCount(jobID string) int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.subs[jobID])
}