server: enqueue pending_runs when scheduled-job dispatch fails
When dispatchBackupForGroup's conn.Send errors, queue a pending_runs row (attempt=1, next_attempt_at = now + group.RetryBackoffSeconds) instead of silently dropping the fire. The orphaned queued job row is left behind for forensic visibility — the drainer will create a fresh job row on its retry. Also adds Store.ListPendingRunsForHost — the on-reconnect drain walks every row for the host, regardless of due-ness, since the host being back makes 'due' irrelevant.
This commit is contained in:
@@ -206,8 +206,30 @@ func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, host
|
|||||||
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := conn.Send(sendCtx, env); err != nil {
|
if err := conn.Send(sendCtx, env); err != nil {
|
||||||
slog.Warn("schedule.fire: send command.run",
|
slog.Warn("schedule.fire: send command.run failed, queueing for retry",
|
||||||
"host_id", hostID, "schedule_id", scheduleID, "err", err)
|
"host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", err)
|
||||||
|
backoff := time.Duration(g.RetryBackoffSeconds) * time.Second
|
||||||
|
if backoff <= 0 {
|
||||||
|
backoff = 60 * time.Second
|
||||||
|
}
|
||||||
|
if enqueueErr := s.deps.Store.EnqueuePendingRun(ctx, &store.PendingRun{
|
||||||
|
ID: ulid.Make().String(),
|
||||||
|
ScheduleID: scheduleID,
|
||||||
|
SourceGroupID: g.ID,
|
||||||
|
HostID: hostID,
|
||||||
|
Attempt: 1,
|
||||||
|
NextAttemptAt: time.Now().UTC().Add(backoff),
|
||||||
|
ScheduledAt: scheduledAt,
|
||||||
|
LastError: err.Error(),
|
||||||
|
}); enqueueErr != nil {
|
||||||
|
slog.Warn("schedule.fire: enqueue pending run failed",
|
||||||
|
"host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", enqueueErr)
|
||||||
|
}
|
||||||
|
// The job row was already persisted earlier in this function — leave
|
||||||
|
// it in `queued` status. The drainer will re-dispatch (creating a
|
||||||
|
// new job row) and the orphaned queued row stays for forensic
|
||||||
|
// visibility. Don't delete it: the audit trail still wants to know
|
||||||
|
// "we tried and the wire was wedged."
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
_ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{
|
_ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{
|
||||||
|
|||||||
@@ -72,6 +72,43 @@ func (st *Store) DuePendingRuns(ctx context.Context, now time.Time, limit int) (
|
|||||||
return out, rows.Err()
|
return out, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListPendingRunsForHost returns every pending row for the host
|
||||||
|
// (regardless of next_attempt_at), ordered by next_attempt_at
|
||||||
|
// ascending. Used by the on-reconnect drain — when a host comes
|
||||||
|
// back, we walk every pending row for it, not just the due ones,
|
||||||
|
// because the host being back makes "due" unimportant: every row
|
||||||
|
// is dispatchable now.
|
||||||
|
func (st *Store) ListPendingRunsForHost(ctx context.Context, hostID string) ([]PendingRun, error) {
|
||||||
|
rows, err := st.db.QueryContext(ctx,
|
||||||
|
`SELECT id, schedule_id, source_group_id, host_id, attempt,
|
||||||
|
next_attempt_at, scheduled_at, COALESCE(last_error, '')
|
||||||
|
FROM pending_runs
|
||||||
|
WHERE host_id = ?
|
||||||
|
ORDER BY next_attempt_at`,
|
||||||
|
hostID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: list pending runs for host: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = rows.Close() }()
|
||||||
|
out := []PendingRun{}
|
||||||
|
for rows.Next() {
|
||||||
|
var p PendingRun
|
||||||
|
var nextAt, scheduledAt string
|
||||||
|
if err := rows.Scan(&p.ID, &p.ScheduleID, &p.SourceGroupID, &p.HostID,
|
||||||
|
&p.Attempt, &nextAt, &scheduledAt, &p.LastError); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if t, err := time.Parse(time.RFC3339Nano, nextAt); err == nil {
|
||||||
|
p.NextAttemptAt = t
|
||||||
|
}
|
||||||
|
if t, err := time.Parse(time.RFC3339Nano, scheduledAt); err == nil {
|
||||||
|
p.ScheduledAt = t
|
||||||
|
}
|
||||||
|
out = append(out, p)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
// DeletePendingRun removes a row by id. Called after successful
|
// DeletePendingRun removes a row by id. Called after successful
|
||||||
// dispatch or after exceeding retry_max.
|
// dispatch or after exceeding retry_max.
|
||||||
func (st *Store) DeletePendingRun(ctx context.Context, id string) error {
|
func (st *Store) DeletePendingRun(ctx context.Context, id string) error {
|
||||||
|
|||||||
@@ -219,3 +219,78 @@ func TestPendingRunQueue(t *testing.T) {
|
|||||||
t.Fatalf("after delete: %v", due)
|
t.Fatalf("after delete: %v", due)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListPendingRunsForHost(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
s := openTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
hostA := makeSchedHost(t, s)
|
||||||
|
hostB := "01HPENDLISTHOSTB00000001"
|
||||||
|
if err := s.CreateHost(ctx, Host{
|
||||||
|
ID: hostB, Name: "pending-list-host-b", OS: "linux", Arch: "amd64",
|
||||||
|
AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1,
|
||||||
|
EnrolledAt: time.Now().UTC(),
|
||||||
|
}, "tokenhashB", ""); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
gA := makeGroup(t, s, hostA, "default", "01HPENDLISTGRPA000000001")
|
||||||
|
gB := makeGroup(t, s, hostB, "default", "01HPENDLISTGRPB000000001")
|
||||||
|
schedA := "01HPENDLISTSCHEDA0000001"
|
||||||
|
schedB := "01HPENDLISTSCHEDB0000001"
|
||||||
|
if err := s.CreateSchedule(ctx, &Schedule{
|
||||||
|
ID: schedA, HostID: hostA, CronExpr: "@hourly", Enabled: true,
|
||||||
|
SourceGroupIDs: []string{gA},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := s.CreateSchedule(ctx, &Schedule{
|
||||||
|
ID: schedB, HostID: hostB, CronExpr: "@hourly", Enabled: true,
|
||||||
|
SourceGroupIDs: []string{gB},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().UTC()
|
||||||
|
// Two rows for hostA — one not-yet-due, one already-due — and one
|
||||||
|
// for hostB. ListPendingRunsForHost(A) must return both A rows
|
||||||
|
// (regardless of due-ness) ordered by next_attempt_at ascending.
|
||||||
|
rows := []*PendingRun{
|
||||||
|
{
|
||||||
|
ID: "01HPENDLISTROW0000000A02", ScheduleID: schedA, SourceGroupID: gA, HostID: hostA,
|
||||||
|
NextAttemptAt: now.Add(time.Hour), ScheduledAt: now,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "01HPENDLISTROW0000000A01", ScheduleID: schedA, SourceGroupID: gA, HostID: hostA,
|
||||||
|
NextAttemptAt: now.Add(-time.Minute), ScheduledAt: now.Add(-time.Hour),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "01HPENDLISTROW0000000B01", ScheduleID: schedB, SourceGroupID: gB, HostID: hostB,
|
||||||
|
NextAttemptAt: now, ScheduledAt: now,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, r := range rows {
|
||||||
|
if err := s.EnqueuePendingRun(ctx, r); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := s.ListPendingRunsForHost(ctx, hostA)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(out) != 2 {
|
||||||
|
t.Fatalf("len=%d, want 2: %+v", len(out), out)
|
||||||
|
}
|
||||||
|
// Ordered ascending by next_attempt_at: the -1m row first, then +1h.
|
||||||
|
if out[0].ID != "01HPENDLISTROW0000000A01" || out[1].ID != "01HPENDLISTROW0000000A02" {
|
||||||
|
t.Fatalf("order: got %s,%s", out[0].ID, out[1].ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err = s.ListPendingRunsForHost(ctx, "non-existent-host")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(out) != 0 {
|
||||||
|
t.Fatalf("non-existent host: got %d rows", len(out))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user