server: enqueue pending_runs when scheduled-job dispatch fails

When dispatchBackupForGroup's conn.Send errors, queue a pending_runs
row (attempt=1, next_attempt_at = now + group.RetryBackoffSeconds)
instead of silently dropping the fire. The orphaned queued job row
is left behind for forensic visibility — the drainer will create a
fresh job row on its retry.

Also adds Store.ListPendingRunsForHost — the on-reconnect drain
walks every row for the host, regardless of due-ness, since the
host being back makes 'due' irrelevant.
This commit is contained in:
2026-05-03 23:53:57 +01:00
parent 2794d5a821
commit e64cf25c0e
3 changed files with 136 additions and 2 deletions
+75
View File
@@ -219,3 +219,78 @@ func TestPendingRunQueue(t *testing.T) {
t.Fatalf("after delete: %v", due)
}
}
func TestListPendingRunsForHost(t *testing.T) {
t.Parallel()
s := openTestStore(t)
ctx := context.Background()
hostA := makeSchedHost(t, s)
hostB := "01HPENDLISTHOSTB00000001"
if err := s.CreateHost(ctx, Host{
ID: hostB, Name: "pending-list-host-b", OS: "linux", Arch: "amd64",
AgentVersion: "dev", ResticVersion: "0.16.0", ProtocolVersion: 1,
EnrolledAt: time.Now().UTC(),
}, "tokenhashB", ""); err != nil {
t.Fatal(err)
}
gA := makeGroup(t, s, hostA, "default", "01HPENDLISTGRPA000000001")
gB := makeGroup(t, s, hostB, "default", "01HPENDLISTGRPB000000001")
schedA := "01HPENDLISTSCHEDA0000001"
schedB := "01HPENDLISTSCHEDB0000001"
if err := s.CreateSchedule(ctx, &Schedule{
ID: schedA, HostID: hostA, CronExpr: "@hourly", Enabled: true,
SourceGroupIDs: []string{gA},
}); err != nil {
t.Fatal(err)
}
if err := s.CreateSchedule(ctx, &Schedule{
ID: schedB, HostID: hostB, CronExpr: "@hourly", Enabled: true,
SourceGroupIDs: []string{gB},
}); err != nil {
t.Fatal(err)
}
now := time.Now().UTC()
// Two rows for hostA — one not-yet-due, one already-due — and one
// for hostB. ListPendingRunsForHost(A) must return both A rows
// (regardless of due-ness) ordered by next_attempt_at ascending.
rows := []*PendingRun{
{
ID: "01HPENDLISTROW0000000A02", ScheduleID: schedA, SourceGroupID: gA, HostID: hostA,
NextAttemptAt: now.Add(time.Hour), ScheduledAt: now,
},
{
ID: "01HPENDLISTROW0000000A01", ScheduleID: schedA, SourceGroupID: gA, HostID: hostA,
NextAttemptAt: now.Add(-time.Minute), ScheduledAt: now.Add(-time.Hour),
},
{
ID: "01HPENDLISTROW0000000B01", ScheduleID: schedB, SourceGroupID: gB, HostID: hostB,
NextAttemptAt: now, ScheduledAt: now,
},
}
for _, r := range rows {
if err := s.EnqueuePendingRun(ctx, r); err != nil {
t.Fatal(err)
}
}
out, err := s.ListPendingRunsForHost(ctx, hostA)
if err != nil {
t.Fatal(err)
}
if len(out) != 2 {
t.Fatalf("len=%d, want 2: %+v", len(out), out)
}
// Ordered ascending by next_attempt_at: the -1m row first, then +1h.
if out[0].ID != "01HPENDLISTROW0000000A01" || out[1].ID != "01HPENDLISTROW0000000A02" {
t.Fatalf("order: got %s,%s", out[0].ID, out[1].ID)
}
out, err = s.ListPendingRunsForHost(ctx, "non-existent-host")
if err != nil {
t.Fatal(err)
}
if len(out) != 0 {
t.Fatalf("non-existent host: got %d rows", len(out))
}
}