package store import ( "context" "errors" "fmt" "time" ) // EnqueuePendingRun queues a missed cron tick for the offline-retry // ticker to dispatch later. Caller (the schedule firing path) sets // next_attempt_at = now + group.retry_backoff_seconds × 2^(attempt-1). func (st *Store) EnqueuePendingRun(ctx context.Context, p *PendingRun) error { if p.ID == "" || p.ScheduleID == "" || p.SourceGroupID == "" || p.HostID == "" { return errors.New("store: pending run id, schedule_id, source_group_id, host_id required") } if p.Attempt == 0 { p.Attempt = 1 } if p.NextAttemptAt.IsZero() { p.NextAttemptAt = time.Now().UTC() } if p.ScheduledAt.IsZero() { p.ScheduledAt = time.Now().UTC() } _, err := st.db.ExecContext(ctx, `INSERT INTO pending_runs (id, schedule_id, source_group_id, host_id, attempt, next_attempt_at, scheduled_at, last_error) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, p.ID, p.ScheduleID, p.SourceGroupID, p.HostID, p.Attempt, p.NextAttemptAt.UTC().Format(time.RFC3339Nano), p.ScheduledAt.UTC().Format(time.RFC3339Nano), nullableString(p.LastError)) if err != nil { return fmt.Errorf("store: enqueue pending run: %w", err) } return nil } // DuePendingRuns returns rows whose next_attempt_at <= now, ordered // oldest first. Server-side ticker calls this every ~30s. func (st *Store) DuePendingRuns(ctx context.Context, now time.Time, limit int) ([]PendingRun, error) { rows, err := st.db.QueryContext(ctx, `SELECT id, schedule_id, source_group_id, host_id, attempt, next_attempt_at, scheduled_at, COALESCE(last_error, '') FROM pending_runs WHERE next_attempt_at <= ? ORDER BY next_attempt_at LIMIT ?`, now.UTC().Format(time.RFC3339Nano), limit) if err != nil { return nil, fmt.Errorf("store: due pending runs: %w", err) } defer func() { _ = rows.Close() }() out := []PendingRun{} for rows.Next() { var p PendingRun var nextAt, scheduledAt string if err := rows.Scan(&p.ID, &p.ScheduleID, &p.SourceGroupID, &p.HostID, &p.Attempt, &nextAt, &scheduledAt, &p.LastError); err != nil { return nil, err } if t, err := time.Parse(time.RFC3339Nano, nextAt); err == nil { p.NextAttemptAt = t } if t, err := time.Parse(time.RFC3339Nano, scheduledAt); err == nil { p.ScheduledAt = t } out = append(out, p) } return out, rows.Err() } // ListPendingRunsForHost returns every pending row for the host // (regardless of next_attempt_at), ordered by next_attempt_at // ascending. Used by the on-reconnect drain — when a host comes // back, we walk every pending row for it, not just the due ones, // because the host being back makes "due" unimportant: every row // is dispatchable now. func (st *Store) ListPendingRunsForHost(ctx context.Context, hostID string) ([]PendingRun, error) { rows, err := st.db.QueryContext(ctx, `SELECT id, schedule_id, source_group_id, host_id, attempt, next_attempt_at, scheduled_at, COALESCE(last_error, '') FROM pending_runs WHERE host_id = ? ORDER BY next_attempt_at`, hostID) if err != nil { return nil, fmt.Errorf("store: list pending runs for host: %w", err) } defer func() { _ = rows.Close() }() out := []PendingRun{} for rows.Next() { var p PendingRun var nextAt, scheduledAt string if err := rows.Scan(&p.ID, &p.ScheduleID, &p.SourceGroupID, &p.HostID, &p.Attempt, &nextAt, &scheduledAt, &p.LastError); err != nil { return nil, err } if t, err := time.Parse(time.RFC3339Nano, nextAt); err == nil { p.NextAttemptAt = t } if t, err := time.Parse(time.RFC3339Nano, scheduledAt); err == nil { p.ScheduledAt = t } out = append(out, p) } return out, rows.Err() } // DeletePendingRun removes a row by id. Called after successful // dispatch or after exceeding retry_max. func (st *Store) DeletePendingRun(ctx context.Context, id string) error { _, err := st.db.ExecContext(ctx, `DELETE FROM pending_runs WHERE id = ?`, id) if err != nil { return fmt.Errorf("store: delete pending run: %w", err) } return nil } // BumpPendingRunAttempt increments the attempt counter and updates // next_attempt_at + last_error. Used after a failed retry — caller // has decided to try again. func (st *Store) BumpPendingRunAttempt(ctx context.Context, id string, nextAttemptAt time.Time, lastError string) error { _, err := st.db.ExecContext(ctx, `UPDATE pending_runs SET attempt = attempt + 1, next_attempt_at = ?, last_error = ? WHERE id = ?`, nextAttemptAt.UTC().Format(time.RFC3339Nano), nullableString(lastError), id) if err != nil { return fmt.Errorf("store: bump pending run: %w", err) } return nil }