package ws import ( "context" "log/slog" "sync" "time" "gitea.dcglab.co.uk/steve/restic-manager/internal/api" ) // JobHub fans agent-emitted job messages (job.progress, log.stream, // job.started, job.finished) out to every browser currently watching // the matching job_id over /api/jobs/{id}/stream. // // Decoupled from the agent Hub: many subscribers per job_id, all // read-only, lifecycle tied to the browser WS rather than the agent's. type JobHub struct { mu sync.RWMutex subs map[string]map[*Subscriber]struct{} // job_id → set } // NewJobHub returns an empty hub. func NewJobHub() *JobHub { return &JobHub{subs: make(map[string]map[*Subscriber]struct{})} } // Subscriber is one browser WS subscription. Each gets its own // buffered channel so a slow client can't block the broadcaster (or, // transitively, the agent's read loop). // // Two-phase usage: Register() returns a Subscriber that's already in // the hub's set (so concurrent Broadcasts will reach it), but no // pump goroutine runs yet. The caller can prime the channel via Send // — useful for late-subscriber catch-up — and then call Run to start // the pump. Run blocks until ctx is canceled or conn dies, and // unregisters on return. type Subscriber struct { hub *JobHub jobID string ch chan api.Envelope } // Register adds a subscriber for jobID and returns it. The caller // MUST call Run to pump messages — until then the subscriber's // channel buffers silently (up to its capacity, then drops). // // Use Register + Send + Run when you need to prime the channel from // the calling goroutine before the pump starts (e.g. to send a // synthetic job.finished to a late subscriber whose target job is // already terminal). For the simple case use Subscribe. func (h *JobHub) Register(jobID string) *Subscriber { const buf = 64 s := &Subscriber{hub: h, jobID: jobID, ch: make(chan api.Envelope, buf)} h.mu.Lock() if h.subs[jobID] == nil { h.subs[jobID] = make(map[*Subscriber]struct{}) } h.subs[jobID][s] = struct{}{} h.mu.Unlock() return s } // Send pushes env onto the subscriber's channel. Non-blocking: if the // buffer is full, the message is dropped and a warning is logged. func (s *Subscriber) Send(env api.Envelope) { select { case s.ch <- env: default: slog.Warn("ws browser sub: send buffer full, dropping message", "job_id", s.jobID, "type", env.Type) } } // Run pumps messages from the subscriber's channel onto conn until // ctx is canceled or conn dies. Unregisters on return. Caller is // expected to invoke this from the goroutine that owns conn. func (s *Subscriber) Run(ctx context.Context, conn *Conn) { defer s.unregister() for { select { case <-ctx.Done(): return case env, ok := <-s.ch: if !ok { return } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) err := conn.Send(sendCtx, env) cancel() if err != nil { slog.Info("ws browser send failed; closing subscriber", "job_id", s.jobID, "err", err) return } } } } func (s *Subscriber) unregister() { s.hub.mu.Lock() if set, ok := s.hub.subs[s.jobID]; ok { delete(set, s) if len(set) == 0 { delete(s.hub.subs, s.jobID) } } s.hub.mu.Unlock() } // Subscribe is a one-call convenience for callers that don't need to // prime the channel before the pump. Equivalent to Register + Run. func (h *JobHub) Subscribe(ctx context.Context, jobID string, conn *Conn) { s := h.Register(jobID) s.Run(ctx, conn) } // Broadcast sends env to every subscriber for jobID. Non-blocking: // if a subscriber's buffer is full, the message is dropped for that // subscriber. Other subscribers are unaffected. func (h *JobHub) Broadcast(jobID string, env api.Envelope) { h.mu.RLock() set := h.subs[jobID] if len(set) == 0 { h.mu.RUnlock() return } targets := make([]*Subscriber, 0, len(set)) for s := range set { targets = append(targets, s) } h.mu.RUnlock() for _, s := range targets { s.Send(env) } } // SubscriberCount returns the number of browsers currently watching // jobID. func (h *JobHub) SubscriberCount(jobID string) int { h.mu.RLock() defer h.mu.RUnlock() return len(h.subs[jobID]) }