Files
restic-manager/internal/server/http/schedule_push.go
T
steve 0c9ea75046 server: drainer uses dispatch-core to avoid duplicate pending_run enqueue
Extract dispatchBackupForGroupCore (persist+marshal+send, no enqueue on
failure) from dispatchBackupForGroup. drainOne now calls the core
directly so a failed Send only bumps the existing pending_runs row via
BumpPendingRunAttempt — not create a second row — stopping the
geometric duplication on repeated drain failures.

dispatchBackupForGroup (schedule.fire path) wraps the core and keeps
its enqueue-on-failure behaviour unchanged.

TestDrainPendingBumpsOnSendFailure strengthened: asserts exactly 1 row
remains after a send failure (was tolerating >=1 duplicate rows).
2026-05-04 10:19:15 +01:00

265 lines
9.7 KiB
Go

// schedule_push.go — server → agent reconciliation push and the
// inbound schedule.fire dispatch.
//
// The slim-schedule wire shape is built here from the (Schedule,
// SourceGroup) pair. Each schedule is sent with its resolved source
// groups inlined so the agent doesn't have to keep its own copy of
// the group catalog. Cron + enabled drive the agent's local timer;
// when an entry fires the agent ships back a schedule.fire and
// dispatchScheduledJob below resolves the schedule's groups and
// dispatches one backup command.run per group.
package http
import (
"context"
"encoding/json"
"errors"
"log/slog"
"time"
"github.com/oklog/ulid/v2"
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
)
// pushScheduleSetOnConn ships the canonical schedule set straight down
// the freshly-accepted hello connection. Callers are inside the
// hello window — using the conn directly avoids racing the hub's
// register-then-supersede sequence.
func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) {
payload, err := s.buildScheduleSetPayload(ctx, hostID)
if err != nil {
slog.Warn("schedule push: build payload", "host_id", hostID, "err", err)
return
}
env, err := api.Marshal(api.MsgScheduleSet, "", payload)
if err != nil {
slog.Warn("schedule push: marshal payload", "host_id", hostID, "err", err)
return
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := conn.Send(sendCtx, env); err != nil {
slog.Warn("schedule push on-hello: send", "host_id", hostID, "err", err)
}
}
// pushScheduleSetAsync pushes the latest schedule set to a connected
// agent (via the hub) on a best-effort basis. Mutations call this
// after a successful CRUD; offline agents pick the new version up on
// next reconnect via pushScheduleSetOnConn.
func (s *Server) pushScheduleSetAsync(hostID string) {
if s.deps.Hub == nil || !s.deps.Hub.Connected(hostID) {
return
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
payload, err := s.buildScheduleSetPayload(ctx, hostID)
if err != nil {
slog.Warn("schedule push async: build payload", "host_id", hostID, "err", err)
return
}
env, err := api.Marshal(api.MsgScheduleSet, "", payload)
if err != nil {
slog.Warn("schedule push async: marshal", "host_id", hostID, "err", err)
return
}
if err := s.deps.Hub.Send(ctx, hostID, env); err != nil {
slog.Debug("schedule push async: send", "host_id", hostID, "err", err)
}
}()
}
// buildScheduleSetPayload assembles the canonical wire shape: every
// schedule for the host with its source groups resolved inline.
func (s *Server) buildScheduleSetPayload(ctx context.Context, hostID string) (api.ScheduleSetPayload, error) {
version, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID)
if err != nil {
return api.ScheduleSetPayload{}, err
}
schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID)
if err != nil {
return api.ScheduleSetPayload{}, err
}
groups, err := s.deps.Store.ListSourceGroupsByHost(ctx, hostID)
if err != nil {
return api.ScheduleSetPayload{}, err
}
groupByID := make(map[string]store.SourceGroup, len(groups))
for _, g := range groups {
groupByID[g.ID] = g
}
out := api.ScheduleSetPayload{Version: version, Schedules: make([]api.Schedule, 0, len(schedules))}
for _, sc := range schedules {
entry := api.Schedule{
ID: sc.ID,
CronExpr: sc.CronExpr,
Enabled: sc.Enabled,
SourceGroups: make([]api.ScheduleSourceGroup, 0, len(sc.SourceGroupIDs)),
}
for _, gid := range sc.SourceGroupIDs {
g, ok := groupByID[gid]
if !ok {
continue
}
retention, _ := json.Marshal(g.RetentionPolicy)
entry.SourceGroups = append(entry.SourceGroups, api.ScheduleSourceGroup{
Name: g.Name,
Includes: g.Includes,
Excludes: g.Excludes,
RetentionPolicy: retention,
RetryMax: g.RetryMax,
RetryBackoffSeconds: g.RetryBackoffSeconds,
})
}
out.Schedules = append(out.Schedules, entry)
}
return out, nil
}
// applyScheduleAck persists the version the agent has confirmed.
func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) {
if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil {
slog.Warn("schedule.ack: persist applied version", "host_id", hostID, "err", err)
}
}
// dispatchScheduledJob handles an agent's schedule.fire. Resolves the
// schedule's source groups and dispatches one backup command.run per
// group, persisting each as a job row with actor_kind=schedule and
// scheduled_id pointing at the schedule.
func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, conn *ws.Conn, scheduleID string, scheduledAt time.Time) {
sc, err := s.deps.Store.GetSchedule(ctx, hostID, scheduleID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
slog.Info("schedule.fire: schedule unknown, ignoring",
"host_id", hostID, "schedule_id", scheduleID)
return
}
slog.Warn("schedule.fire: load schedule", "host_id", hostID, "err", err)
return
}
if !sc.Enabled {
slog.Info("schedule.fire: schedule disabled, ignoring",
"host_id", hostID, "schedule_id", scheduleID)
return
}
if len(sc.SourceGroupIDs) == 0 {
slog.Warn("schedule.fire: schedule has no source groups",
"host_id", hostID, "schedule_id", scheduleID)
return
}
for _, gid := range sc.SourceGroupIDs {
g, err := s.deps.Store.GetSourceGroup(ctx, hostID, gid)
if err != nil {
slog.Warn("schedule.fire: load source group",
"host_id", hostID, "schedule_id", scheduleID, "group_id", gid, "err", err)
continue
}
s.dispatchBackupForGroup(ctx, conn, hostID, scheduleID, g, scheduledAt)
}
}
// dispatchBackupForGroupCore persists a backup job row, marshals and
// sends the command.run envelope, and audit-logs the dispatch. It does
// NOT enqueue a PendingRun on failure — that responsibility belongs to
// the caller when appropriate.
//
// Returns (jobID, nil) on success. Returns ("", err) on any failure;
// the error is also slog.Warn-ed inside this function so callers don't
// need to log it again.
//
// Used by both dispatchBackupForGroup (schedule.fire path, which adds
// enqueue-on-failure) and drainOne (which handles failure via
// BumpPendingRunAttempt on the existing row, avoiding double-enqueue).
func (s *Server) dispatchBackupForGroupCore(ctx context.Context, conn *ws.Conn, hostID, scheduleID string, g *store.SourceGroup, scheduledAt time.Time) (string, error) {
jobID := ulid.Make().String()
now := time.Now().UTC()
scheduleRef := scheduleID
if err := s.deps.Store.CreateJob(ctx, store.Job{
ID: jobID,
HostID: hostID,
Kind: string(api.JobBackup),
ScheduledID: &scheduleRef,
ActorKind: "schedule",
CreatedAt: now,
}); err != nil {
slog.Warn("schedule.fire: persist job", "host_id", hostID,
"schedule_id", scheduleID, "group", g.Name, "err", err)
return "", err
}
// Backup ignores RetentionPolicy — the forget cadence lives on
// host_repo_maintenance and is driven by the server-side ticker
// (P2R-06). Don't ship the field on backup dispatches.
env, err := api.Marshal(api.MsgCommandRun, jobID, api.CommandRunPayload{
JobID: jobID,
Kind: api.JobBackup,
Includes: g.Includes,
Excludes: g.Excludes,
Tag: g.Name,
})
if err != nil {
slog.Warn("schedule.fire: marshal command.run",
"host_id", hostID, "schedule_id", scheduleID, "err", err)
return "", err
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := conn.Send(sendCtx, env); err != nil {
slog.Warn("schedule.fire: send command.run failed",
"host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", err)
// The job row was already persisted — leave it in `queued` status.
// The drainer will re-dispatch (creating a new job row) and the
// orphaned queued row stays for forensic visibility.
return "", err
}
_ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{
ID: ulid.Make().String(),
Actor: "schedule",
Action: "job.run_now",
TargetKind: ptr("job"),
TargetID: &jobID,
TS: now,
})
slog.Info("schedule.fire: dispatched backup",
"host_id", hostID, "schedule_id", scheduleID,
"group", g.Name, "job_id", jobID, "scheduled_at", scheduledAt)
return jobID, nil
}
// dispatchBackupForGroup is the schedule.fire entry point. Wraps
// dispatchBackupForGroupCore with enqueue-on-failure: a failed Send
// queues a fresh PendingRun for the drainer to retry later.
//
// Returns the persisted job ID on success, or "" on any failure.
func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, hostID, scheduleID string, g *store.SourceGroup, scheduledAt time.Time) string {
jobID, err := s.dispatchBackupForGroupCore(ctx, conn, hostID, scheduleID, g, scheduledAt)
if err == nil {
return jobID
}
// Send (or an earlier step) failed — err was already logged inside
// the core. Enqueue a fresh PendingRun for the drainer to retry.
backoff := time.Duration(g.RetryBackoffSeconds) * time.Second
if backoff <= 0 {
backoff = 60 * time.Second
}
if enqueueErr := s.deps.Store.EnqueuePendingRun(ctx, &store.PendingRun{
ID: ulid.Make().String(),
ScheduleID: scheduleID,
SourceGroupID: g.ID,
HostID: hostID,
Attempt: 1,
NextAttemptAt: time.Now().UTC().Add(backoff),
ScheduledAt: scheduledAt,
LastError: err.Error(),
}); enqueueErr != nil {
slog.Warn("schedule.fire: enqueue pending run failed",
"host_id", hostID, "schedule_id", scheduleID, "group", g.Name, "err", enqueueErr)
}
return ""
}