server: enqueue pending_runs when scheduled-job dispatch fails
When dispatchBackupForGroup's conn.Send errors, queue a pending_runs row (attempt=1, next_attempt_at = now + group.RetryBackoffSeconds) instead of silently dropping the fire. The orphaned queued job row is left behind for forensic visibility — the drainer will create a fresh job row on its retry. Also adds Store.ListPendingRunsForHost — the on-reconnect drain walks every row for the host, regardless of due-ness, since the host being back makes 'due' irrelevant.
This commit is contained in:
@@ -72,6 +72,43 @@ func (st *Store) DuePendingRuns(ctx context.Context, now time.Time, limit int) (
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// ListPendingRunsForHost returns every pending row for the host
|
||||
// (regardless of next_attempt_at), ordered by next_attempt_at
|
||||
// ascending. Used by the on-reconnect drain — when a host comes
|
||||
// back, we walk every pending row for it, not just the due ones,
|
||||
// because the host being back makes "due" unimportant: every row
|
||||
// is dispatchable now.
|
||||
func (st *Store) ListPendingRunsForHost(ctx context.Context, hostID string) ([]PendingRun, error) {
|
||||
rows, err := st.db.QueryContext(ctx,
|
||||
`SELECT id, schedule_id, source_group_id, host_id, attempt,
|
||||
next_attempt_at, scheduled_at, COALESCE(last_error, '')
|
||||
FROM pending_runs
|
||||
WHERE host_id = ?
|
||||
ORDER BY next_attempt_at`,
|
||||
hostID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("store: list pending runs for host: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
out := []PendingRun{}
|
||||
for rows.Next() {
|
||||
var p PendingRun
|
||||
var nextAt, scheduledAt string
|
||||
if err := rows.Scan(&p.ID, &p.ScheduleID, &p.SourceGroupID, &p.HostID,
|
||||
&p.Attempt, &nextAt, &scheduledAt, &p.LastError); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if t, err := time.Parse(time.RFC3339Nano, nextAt); err == nil {
|
||||
p.NextAttemptAt = t
|
||||
}
|
||||
if t, err := time.Parse(time.RFC3339Nano, scheduledAt); err == nil {
|
||||
p.ScheduledAt = t
|
||||
}
|
||||
out = append(out, p)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// DeletePendingRun removes a row by id. Called after successful
|
||||
// dispatch or after exceeding retry_max.
|
||||
func (st *Store) DeletePendingRun(ctx context.Context, id string) error {
|
||||
|
||||
Reference in New Issue
Block a user