package ws import ( "context" "log/slog" "sync" "time" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" ) // JobHub fans agent-emitted job messages (job.progress, log.stream, // job.started, job.finished) out to every browser currently watching // the matching job_id over /api/jobs/{id}/stream. // // Decoupled from the agent Hub: many subscribers per job_id, all // read-only, lifecycle tied to the browser WS rather than the agent's. type JobHub struct { mu sync.RWMutex subs map[string]map[*subscriber]struct{} // job_id → set } // NewJobHub returns an empty hub. func NewJobHub() *JobHub { return &JobHub{subs: make(map[string]map[*subscriber]struct{})} } // subscriber is one browser WS subscription. Each gets its own // buffered channel + writer goroutine so a slow client can't block // the broadcaster (or, transitively, the agent's read loop). type subscriber struct { jobID string ch chan api.Envelope } // Subscribe registers a new subscriber for jobID. Run pumps messages // from the subscriber's channel onto conn until ctx is cancelled or // conn dies; it returns when one of those happens. Caller is // expected to call this from the goroutine that owns conn. // // If the subscriber's send channel fills, broadcasts drop messages // for that subscriber rather than blocking. The browser will see a // gap; on completion the page can re-fetch persisted log_lines to // reconcile. func (h *JobHub) Subscribe(ctx context.Context, jobID string, conn *Conn) { const buf = 64 s := &subscriber{jobID: jobID, ch: make(chan api.Envelope, buf)} h.mu.Lock() if h.subs[jobID] == nil { h.subs[jobID] = make(map[*subscriber]struct{}) } h.subs[jobID][s] = struct{}{} h.mu.Unlock() defer func() { h.mu.Lock() if set, ok := h.subs[jobID]; ok { delete(set, s) if len(set) == 0 { delete(h.subs, jobID) } } h.mu.Unlock() }() // Drain pump. for { select { case <-ctx.Done(): return case env, ok := <-s.ch: if !ok { return } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) err := conn.Send(sendCtx, env) cancel() if err != nil { slog.Info("ws browser send failed; closing subscriber", "job_id", jobID, "err", err) return } } } } // Broadcast sends env to every subscriber for jobID. Non-blocking: // if a subscriber's buffer is full, the message is dropped for that // subscriber and a warning is logged. Other subscribers are // unaffected. // // Safe to call from any goroutine; holds an RLock briefly to snapshot // the subscriber set, then releases before sending. func (h *JobHub) Broadcast(jobID string, env api.Envelope) { h.mu.RLock() set := h.subs[jobID] if len(set) == 0 { h.mu.RUnlock() return } targets := make([]*subscriber, 0, len(set)) for s := range set { targets = append(targets, s) } h.mu.RUnlock() for _, s := range targets { select { case s.ch <- env: default: // Buffer full — drop. Logged once per drop; a flood means // the browser is genuinely stuck, not just slow. slog.Warn("ws browser sub: send buffer full, dropping message", "job_id", jobID, "type", env.Type) } } } // SubscriberCount returns the number of browsers currently watching // jobID. Used for diagnostics / future "this many people are // watching" counters. func (h *JobHub) SubscriberCount(jobID string) int { h.mu.RLock() defer h.mu.RUnlock() return len(h.subs[jobID]) }