Files
restic-manager/internal/store/pending.go
T
steve e64cf25c0e server: enqueue pending_runs when scheduled-job dispatch fails
When dispatchBackupForGroup's conn.Send errors, queue a pending_runs
row (attempt=1, next_attempt_at = now + group.RetryBackoffSeconds)
instead of silently dropping the fire. The orphaned queued job row
is left behind for forensic visibility — the drainer will create a
fresh job row on its retry.

Also adds Store.ListPendingRunsForHost — the on-reconnect drain
walks every row for the host, regardless of due-ness, since the
host being back makes 'due' irrelevant.
2026-05-04 10:19:15 +01:00

141 lines
4.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package store
import (
"context"
"errors"
"fmt"
"time"
)
// EnqueuePendingRun queues a missed cron tick for the offline-retry
// ticker to dispatch later. Caller (the schedule firing path) sets
// next_attempt_at = now + group.retry_backoff_seconds × 2^(attempt-1).
func (st *Store) EnqueuePendingRun(ctx context.Context, p *PendingRun) error {
if p.ID == "" || p.ScheduleID == "" || p.SourceGroupID == "" || p.HostID == "" {
return errors.New("store: pending run id, schedule_id, source_group_id, host_id required")
}
if p.Attempt == 0 {
p.Attempt = 1
}
if p.NextAttemptAt.IsZero() {
p.NextAttemptAt = time.Now().UTC()
}
if p.ScheduledAt.IsZero() {
p.ScheduledAt = time.Now().UTC()
}
_, err := st.db.ExecContext(ctx,
`INSERT INTO pending_runs (id, schedule_id, source_group_id, host_id,
attempt, next_attempt_at, scheduled_at, last_error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
p.ID, p.ScheduleID, p.SourceGroupID, p.HostID,
p.Attempt,
p.NextAttemptAt.UTC().Format(time.RFC3339Nano),
p.ScheduledAt.UTC().Format(time.RFC3339Nano),
nullableString(p.LastError))
if err != nil {
return fmt.Errorf("store: enqueue pending run: %w", err)
}
return nil
}
// DuePendingRuns returns rows whose next_attempt_at <= now, ordered
// oldest first. Server-side ticker calls this every ~30s.
func (st *Store) DuePendingRuns(ctx context.Context, now time.Time, limit int) ([]PendingRun, error) {
rows, err := st.db.QueryContext(ctx,
`SELECT id, schedule_id, source_group_id, host_id, attempt,
next_attempt_at, scheduled_at, COALESCE(last_error, '')
FROM pending_runs
WHERE next_attempt_at <= ?
ORDER BY next_attempt_at
LIMIT ?`,
now.UTC().Format(time.RFC3339Nano), limit)
if err != nil {
return nil, fmt.Errorf("store: due pending runs: %w", err)
}
defer func() { _ = rows.Close() }()
out := []PendingRun{}
for rows.Next() {
var p PendingRun
var nextAt, scheduledAt string
if err := rows.Scan(&p.ID, &p.ScheduleID, &p.SourceGroupID, &p.HostID,
&p.Attempt, &nextAt, &scheduledAt, &p.LastError); err != nil {
return nil, err
}
if t, err := time.Parse(time.RFC3339Nano, nextAt); err == nil {
p.NextAttemptAt = t
}
if t, err := time.Parse(time.RFC3339Nano, scheduledAt); err == nil {
p.ScheduledAt = t
}
out = append(out, p)
}
return out, rows.Err()
}
// ListPendingRunsForHost returns every pending row for the host
// (regardless of next_attempt_at), ordered by next_attempt_at
// ascending. Used by the on-reconnect drain — when a host comes
// back, we walk every pending row for it, not just the due ones,
// because the host being back makes "due" unimportant: every row
// is dispatchable now.
func (st *Store) ListPendingRunsForHost(ctx context.Context, hostID string) ([]PendingRun, error) {
rows, err := st.db.QueryContext(ctx,
`SELECT id, schedule_id, source_group_id, host_id, attempt,
next_attempt_at, scheduled_at, COALESCE(last_error, '')
FROM pending_runs
WHERE host_id = ?
ORDER BY next_attempt_at`,
hostID)
if err != nil {
return nil, fmt.Errorf("store: list pending runs for host: %w", err)
}
defer func() { _ = rows.Close() }()
out := []PendingRun{}
for rows.Next() {
var p PendingRun
var nextAt, scheduledAt string
if err := rows.Scan(&p.ID, &p.ScheduleID, &p.SourceGroupID, &p.HostID,
&p.Attempt, &nextAt, &scheduledAt, &p.LastError); err != nil {
return nil, err
}
if t, err := time.Parse(time.RFC3339Nano, nextAt); err == nil {
p.NextAttemptAt = t
}
if t, err := time.Parse(time.RFC3339Nano, scheduledAt); err == nil {
p.ScheduledAt = t
}
out = append(out, p)
}
return out, rows.Err()
}
// DeletePendingRun removes a row by id. Called after successful
// dispatch or after exceeding retry_max.
func (st *Store) DeletePendingRun(ctx context.Context, id string) error {
_, err := st.db.ExecContext(ctx,
`DELETE FROM pending_runs WHERE id = ?`, id)
if err != nil {
return fmt.Errorf("store: delete pending run: %w", err)
}
return nil
}
// BumpPendingRunAttempt increments the attempt counter and updates
// next_attempt_at + last_error. Used after a failed retry — caller
// has decided to try again.
func (st *Store) BumpPendingRunAttempt(ctx context.Context, id string, nextAttemptAt time.Time, lastError string) error {
_, err := st.db.ExecContext(ctx,
`UPDATE pending_runs SET
attempt = attempt + 1,
next_attempt_at = ?,
last_error = ?
WHERE id = ?`,
nextAttemptAt.UTC().Format(time.RFC3339Nano),
nullableString(lastError),
id)
if err != nil {
return fmt.Errorf("store: bump pending run: %w", err)
}
return nil
}