From e64cf25c0ed07ec2cdee58f3ea1eedb00d780349 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Sun, 3 May 2026 23:53:57 +0100 Subject: [PATCH] server: enqueue pending_runs when scheduled-job dispatch fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When dispatchBackupForGroup's conn.Send errors, queue a pending_runs row (attempt=1, next_attempt_at = now + group.RetryBackoffSeconds) instead of silently dropping the fire. The orphaned queued job row is left behind for forensic visibility — the drainer will create a fresh job row on its retry. Also adds Store.ListPendingRunsForHost — the on-reconnect drain walks every row for the host, regardless of due-ness, since the host being back makes 'due' irrelevant. --- internal/server/http/schedule_push.go | 26 +++++++++- internal/store/pending.go | 37 +++++++++++++ internal/store/sources_test.go | 75 +++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 2 deletions(-) diff --git a/internal/server/http/schedule_push.go b/internal/server/http/schedule_push.go index 02692b7..42fadfe 100644 --- a/internal/server/http/schedule_push.go +++ b/internal/server/http/schedule_push.go @@ -206,8 +206,30 @@ func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, host sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := conn.Send(sendCtx, env); err != nil { - slog.Warn("schedule.fire: send command.run", - "host_id", hostID, "schedule_id", scheduleID, "err", err) + slog.Warn("schedule.fire: send command.run failed, queueing for retry", + "host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", err) + backoff := time.Duration(g.RetryBackoffSeconds) * time.Second + if backoff <= 0 { + backoff = 60 * time.Second + } + if enqueueErr := s.deps.Store.EnqueuePendingRun(ctx, &store.PendingRun{ + ID: ulid.Make().String(), + ScheduleID: scheduleID, + SourceGroupID: g.ID, + HostID: hostID, + Attempt: 1, + NextAttemptAt: time.Now().UTC().Add(backoff), + ScheduledAt: scheduledAt, + LastError: err.Error(), + }); enqueueErr != nil { + slog.Warn("schedule.fire: enqueue pending run failed", + "host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", enqueueErr) + } + // The job row was already persisted earlier in this function — leave + // it in `queued` status. The drainer will re-dispatch (creating a + // new job row) and the orphaned queued row stays for forensic + // visibility. Don't delete it: the audit trail still wants to know + // "we tried and the wire was wedged." return "" } _ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{ diff --git a/internal/store/pending.go b/internal/store/pending.go index 42f2fdd..097d336 100644 --- a/internal/store/pending.go +++ b/internal/store/pending.go @@ -72,6 +72,43 @@ func (st *Store) DuePendingRuns(ctx context.Context, now time.Time, limit int) ( return out, rows.Err() } +// ListPendingRunsForHost returns every pending row for the host +// (regardless of next_attempt_at), ordered by next_attempt_at +// ascending. Used by the on-reconnect drain — when a host comes +// back, we walk every pending row for it, not just the due ones, +// because the host being back makes "due" unimportant: every row +// is dispatchable now. +func (st *Store) ListPendingRunsForHost(ctx context.Context, hostID string) ([]PendingRun, error) { + rows, err := st.db.QueryContext(ctx, + `SELECT id, schedule_id, source_group_id, host_id, attempt, + next_attempt_at, scheduled_at, COALESCE(last_error, '') + FROM pending_runs + WHERE host_id = ? + ORDER BY next_attempt_at`, + hostID) + if err != nil { + return nil, fmt.Errorf("store: list pending runs for host: %w", err) + } + defer func() { _ = rows.Close() }() + out := []PendingRun{} + for rows.Next() { + var p PendingRun + var nextAt, scheduledAt string + if err := rows.Scan(&p.ID, &p.ScheduleID, &p.SourceGroupID, &p.HostID, + &p.Attempt, &nextAt, &scheduledAt, &p.LastError); err != nil { + return nil, err + } + if t, err := time.Parse(time.RFC3339Nano, nextAt); err == nil { + p.NextAttemptAt = t + } + if t, err := time.Parse(time.RFC3339Nano, scheduledAt); err == nil { + p.ScheduledAt = t + } + out = append(out, p) + } + return out, rows.Err() +} + // DeletePendingRun removes a row by id. Called after successful // dispatch or after exceeding retry_max. func (st *Store) DeletePendingRun(ctx context.Context, id string) error { diff --git a/internal/store/sources_test.go b/internal/store/sources_test.go index 2f6d7bb..28cdf9b 100644 --- a/internal/store/sources_test.go +++ b/internal/store/sources_test.go @@ -219,3 +219,78 @@ func TestPendingRunQueue(t *testing.T) { t.Fatalf("after delete: %v", due) } } + +func TestListPendingRunsForHost(t *testing.T) { + t.Parallel() + s := openTestStore(t) + ctx := context.Background() + hostA := makeSchedHost(t, s) + hostB := "01HPENDLISTHOSTB00000001" + if err := s.CreateHost(ctx, Host{ + ID: hostB, Name: "pending-list-host-b", OS: "linux", Arch: "amd64", + AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1, + EnrolledAt: time.Now().UTC(), + }, "tokenhashB", ""); err != nil { + t.Fatal(err) + } + gA := makeGroup(t, s, hostA, "default", "01HPENDLISTGRPA000000001") + gB := makeGroup(t, s, hostB, "default", "01HPENDLISTGRPB000000001") + schedA := "01HPENDLISTSCHEDA0000001" + schedB := "01HPENDLISTSCHEDB0000001" + if err := s.CreateSchedule(ctx, &Schedule{ + ID: schedA, HostID: hostA, CronExpr: "@hourly", Enabled: true, + SourceGroupIDs: []string{gA}, + }); err != nil { + t.Fatal(err) + } + if err := s.CreateSchedule(ctx, &Schedule{ + ID: schedB, HostID: hostB, CronExpr: "@hourly", Enabled: true, + SourceGroupIDs: []string{gB}, + }); err != nil { + t.Fatal(err) + } + + now := time.Now().UTC() + // Two rows for hostA — one not-yet-due, one already-due — and one + // for hostB. ListPendingRunsForHost(A) must return both A rows + // (regardless of due-ness) ordered by next_attempt_at ascending. + rows := []*PendingRun{ + { + ID: "01HPENDLISTROW0000000A02", ScheduleID: schedA, SourceGroupID: gA, HostID: hostA, + NextAttemptAt: now.Add(time.Hour), ScheduledAt: now, + }, + { + ID: "01HPENDLISTROW0000000A01", ScheduleID: schedA, SourceGroupID: gA, HostID: hostA, + NextAttemptAt: now.Add(-time.Minute), ScheduledAt: now.Add(-time.Hour), + }, + { + ID: "01HPENDLISTROW0000000B01", ScheduleID: schedB, SourceGroupID: gB, HostID: hostB, + NextAttemptAt: now, ScheduledAt: now, + }, + } + for _, r := range rows { + if err := s.EnqueuePendingRun(ctx, r); err != nil { + t.Fatal(err) + } + } + + out, err := s.ListPendingRunsForHost(ctx, hostA) + if err != nil { + t.Fatal(err) + } + if len(out) != 2 { + t.Fatalf("len=%d, want 2: %+v", len(out), out) + } + // Ordered ascending by next_attempt_at: the -1m row first, then +1h. + if out[0].ID != "01HPENDLISTROW0000000A01" || out[1].ID != "01HPENDLISTROW0000000A02" { + t.Fatalf("order: got %s,%s", out[0].ID, out[1].ID) + } + + out, err = s.ListPendingRunsForHost(ctx, "non-existent-host") + if err != nil { + t.Fatal(err) + } + if len(out) != 0 { + t.Fatalf("non-existent host: got %d rows", len(out)) + } +}