server: serialize DrainPending per host (avoid drain double-dispatch)
Add a per-host drain mutex (drainLocks map guarded by drainLocksMu) on the Server struct. DrainPending acquires it with TryLock: if a drain is already in-flight for this host, the call returns immediately — the running drain will see every pending row. This prevents the on-hello goroutine and the 30s tick from both listing the same host's rows and dispatching them twice. Update three existing tests that called srv.DrainPending explicitly after the on-hello goroutine had already been spawned: replace the now-redundant direct call with a waitForPendingCount poll so they don't race the goroutine's mutex ownership. Add TestDrainPendingSerializesPerHost which fires 10 concurrent DrainPending goroutines against a 5-row queue and asserts exactly 5 job rows result.
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
@@ -34,7 +35,18 @@ const (
|
||||
// row is dropped (audit-logged as abandoned). Otherwise we attempt
|
||||
// dispatch; success deletes the row, failure bumps the attempt and
|
||||
// reschedules with exponential backoff.
|
||||
//
|
||||
// A per-host mutex (hostDrainMutex) ensures that the on-hello goroutine
|
||||
// and the 30s tick cannot process the same host concurrently. If a drain
|
||||
// is already in-flight for this host, the call returns immediately — the
|
||||
// running drain will see any rows we'd have processed.
|
||||
func (s *Server) DrainPending(ctx context.Context, hostID string) {
|
||||
mu := s.hostDrainMutex(hostID)
|
||||
if !mu.TryLock() {
|
||||
return
|
||||
}
|
||||
defer mu.Unlock()
|
||||
|
||||
runs, err := s.deps.Store.ListPendingRunsForHost(ctx, hostID)
|
||||
if err != nil {
|
||||
slog.Warn("drain pending: list", "host_id", hostID, "err", err)
|
||||
@@ -148,6 +160,24 @@ func (s *Server) abandonPending(ctx context.Context, p store.PendingRun, reason
|
||||
}
|
||||
}
|
||||
|
||||
// hostDrainMutex returns the per-host mutex for DrainPending,
|
||||
// creating it on first request. The map is guarded by drainLocksMu.
|
||||
// Mutex objects are never deleted from the map — there are at most
|
||||
// len(hosts) entries, which is bounded by the fleet size.
|
||||
func (s *Server) hostDrainMutex(hostID string) *sync.Mutex {
|
||||
s.drainLocksMu.Lock()
|
||||
defer s.drainLocksMu.Unlock()
|
||||
if s.drainLocks == nil {
|
||||
s.drainLocks = make(map[string]*sync.Mutex)
|
||||
}
|
||||
mu, ok := s.drainLocks[hostID]
|
||||
if !ok {
|
||||
mu = &sync.Mutex{}
|
||||
s.drainLocks[hostID] = mu
|
||||
}
|
||||
return mu
|
||||
}
|
||||
|
||||
// DrainAllDue is the 30s-ticker entrypoint. Walks rows whose
|
||||
// next_attempt_at <= now (DuePendingRuns), dedupes by host, and calls
|
||||
// DrainPending per host. The DrainPending then re-walks the host's
|
||||
|
||||
Reference in New Issue
Block a user