Files
restic-manager/internal/api/messages.go
T
steve 608962441b P2-02 (agent side) + P2-03: agent scheduler + schedule.fire dispatch
Closes the schedule reconciliation loop end-to-end.

* New `internal/agent/scheduler` package wraps robfig/cron/v3 with
  the lifecycle the agent needs:
  - Apply(ScheduleSetPayload, Sender) stops the prior cron (waiting
    for in-flight entries to return), rebuilds from scratch, starts,
    and emits schedule.ack with the version we just applied.
  - Disabled entries skipped silently; bad cron exprs (which
    shouldn't reach us — the server validates — but defensive)
    log a warn and skip.
  - On each cron tick the entry sends a new schedule.fire envelope
    to the server with {schedule_id, scheduled_at}. The scheduler
    itself never builds CommandRunPayloads — server is the source
    of truth for jobs.
  - tx is swapped on every Apply, so reconnect is handled
    naturally: cron entries that fire against a dropped tx log
    "no active connection" and skip the tick.
  - Stop() is idempotent and waits for the cron's in-flight
    workers via cron.Stop().Done().

* New wire message api.MsgScheduleFire + api.ScheduleFirePayload
  for the agent → server "I just fired locally" RPC.

* Server-side dispatch (schedule_push.go: dispatchScheduledJob):
  looks up the schedule by id, validates ownership + that it's
  enabled, builds args from kind (paths for backup; other kinds
  are still arg-less in Phase 2 and grow as those job kinds land
  in P2-05..08), persists a jobs row with actor_kind=schedule +
  scheduled_id, and writes command.run back on the same conn so
  the agent runs through its existing dispatch path.

* store.CreateJob now writes scheduled_id. This column was in the
  schema since 0001 but never populated — the original P1 path
  only had operator-driven jobs, so actor_kind was always 'user'
  and scheduled_id was always nil.

* cmd/agent/main.go integration: dispatcher gains a
  *scheduler.Scheduler; the MsgScheduleSet case now hands the
  payload to scheduler.Apply (in a goroutine so the WS read loop
  keeps draining other messages).

* WS dispatcher gains OnScheduleFire alongside OnScheduleAck.

* Tests:
  - scheduler unit tests (4): ack-on-apply, cron tick fires
    schedule.fire envelope, disabled entries don't fire, replace-
    prior-state stops the old cron.
  - Server-side end-to-end: schedule.fire → command.run with the
    right job_id / kind / args, plus jobs row with actor_kind=
    "schedule" and scheduled_id linking back to the schedule.

Persistence of next-fire times across agent restarts is
deliberately deferred. A missed fire window during downtime
simply fires once on reconnect — that's the desirable behaviour
(the operator wants the missed backup to run, not be silently
skipped because we lost track of when it was due).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 11:29:12 +01:00

230 lines
8.0 KiB
Go

package api
import (
"encoding/json"
"time"
)
// HostOS / HostArch are constrained string types. The store stores them
// raw, but agent metadata collection should populate them from these
// constants so we don't end up with both "linux" and "Linux" rows.
type HostOS string
const (
OSLinux HostOS = "linux"
OSWindows HostOS = "windows"
)
type HostArch string
const (
ArchAmd64 HostArch = "amd64"
ArchArm64 HostArch = "arm64"
)
// HelloPayload is the agent's first message after WS auth. The server
// upserts a Host row, marks it online, and (if protocol_version is
// acceptable) responds with a config.update + schedule.set burst.
type HelloPayload struct {
ProtocolVersion int `json:"protocol_version"`
AgentVersion string `json:"agent_version"`
ResticVersion string `json:"restic_version"`
Hostname string `json:"hostname"`
OS HostOS `json:"os"`
Arch HostArch `json:"arch"`
BootTime time.Time `json:"boot_time,omitempty"`
}
// HeartbeatPayload is sent by the agent every 30s. It carries no data
// today; presence is the signal. Future fields (load, free disk) can
// land here without bumping protocol_version.
type HeartbeatPayload struct {
SentAt time.Time `json:"sent_at"`
}
// JobKind is the operation an agent is being asked to run, or just ran.
type JobKind string
const (
JobBackup JobKind = "backup"
JobInit JobKind = "init"
JobForget JobKind = "forget"
JobPrune JobKind = "prune"
JobCheck JobKind = "check"
JobUnlock JobKind = "unlock"
)
// JobStatus is the lifecycle state of a job.
type JobStatus string
const (
JobQueued JobStatus = "queued"
JobRunning JobStatus = "running"
JobSucceeded JobStatus = "succeeded"
JobFailed JobStatus = "failed"
JobCancelled JobStatus = "cancelled"
)
// CommandRunPayload is the server → agent dispatch for a run-now job.
type CommandRunPayload struct {
JobID string `json:"job_id"`
Kind JobKind `json:"kind"`
Args []string `json:"args,omitempty"`
}
// CommandCancelPayload is the server → agent cancel signal.
type CommandCancelPayload struct {
JobID string `json:"job_id"`
}
// CommandResultPayload acks a command.run dispatch (the agent has
// accepted the job and persisted it locally) — this is *not* the job
// completion. job.finished signals that.
type CommandResultPayload struct {
JobID string `json:"job_id"`
Accepted bool `json:"accepted"`
Error string `json:"error,omitempty"`
}
// JobStartedPayload — agent has begun execution.
type JobStartedPayload struct {
JobID string `json:"job_id"`
Kind JobKind `json:"kind"`
StartedAt time.Time `json:"started_at"`
}
// JobProgressPayload — agent's periodic status while a job is running.
// Field set chosen to match what restic --json emits for `backup`;
// other kinds populate the subset that makes sense.
type JobProgressPayload struct {
JobID string `json:"job_id"`
PercentDone float64 `json:"percent_done"`
FilesDone int64 `json:"files_done"`
TotalFiles int64 `json:"total_files"`
BytesDone int64 `json:"bytes_done"`
TotalBytes int64 `json:"total_bytes"`
ETASeconds int64 `json:"eta_seconds"`
ThroughputBps int64 `json:"throughput_bps"`
}
// JobFinishedPayload — agent reports terminal state.
type JobFinishedPayload struct {
JobID string `json:"job_id"`
Status JobStatus `json:"status"`
ExitCode int `json:"exit_code"`
FinishedAt time.Time `json:"finished_at"`
Stats json.RawMessage `json:"stats,omitempty"` // restic summary blob
Error string `json:"error,omitempty"`
}
// LogStreamLine is one entry of the live job log.
type LogStreamLine struct {
JobID string `json:"job_id"`
Seq int64 `json:"seq"`
TS time.Time `json:"ts"`
Stream LogStream `json:"stream"`
Payload string `json:"payload"`
}
// LogStream identifies which channel a log line came from.
type LogStream string
const (
LogStdout LogStream = "stdout"
LogStderr LogStream = "stderr"
LogEvent LogStream = "event" // parsed restic --json event
)
// SnapshotsReportPayload — agent dumps its full snapshot list after
// each successful backup, so the server can refresh its projection.
type SnapshotsReportPayload struct {
Snapshots []Snapshot `json:"snapshots"`
}
// Snapshot is the projection mirrored from `restic snapshots --json`.
// SizeBytes / FileCount come from the embedded summary block on
// restic 0.16+; older clients leave them at zero (the UI degrades
// gracefully).
type Snapshot struct {
ID string `json:"id"` // long restic snapshot ID
ShortID string `json:"short_id"` // 8-hex-char form
Time time.Time `json:"time"`
Hostname string `json:"hostname"`
Paths []string `json:"paths"`
Tags []string `json:"tags,omitempty"`
SizeBytes int64 `json:"size_bytes,omitempty"`
FileCount int64 `json:"file_count,omitempty"`
}
// RepoStatsPayload — agent reports periodic repo health facts derived
// from `restic stats` and lock-file inspection.
type RepoStatsPayload struct {
SizeBytes int64 `json:"size_bytes"`
SnapshotCount int `json:"snapshot_count"`
DedupRatio float64 `json:"dedup_ratio"`
LastCheckAt time.Time `json:"last_check_at,omitempty"`
LastCheckStatus string `json:"last_check_status,omitempty"`
LockState string `json:"lock_state"` // locked|unlocked
}
// Schedule is the agent-facing view of a Schedule row. (Server-side
// CRUD shapes live in the http handlers; this is what gets pushed.)
type Schedule struct {
ID string `json:"id"`
Kind JobKind `json:"kind"`
CronExpr string `json:"cron_expr"`
Paths []string `json:"paths,omitempty"`
Excludes []string `json:"excludes,omitempty"`
Tags []string `json:"tags,omitempty"`
RetentionPolicy json.RawMessage `json:"retention_policy,omitempty"`
Options json.RawMessage `json:"options,omitempty"`
PreHook string `json:"pre_hook,omitempty"`
PostHook string `json:"post_hook,omitempty"`
Enabled bool `json:"enabled"`
}
// ScheduleSetPayload — server pushes the full canonical schedule list
// for a host. Agent reconciles its local cron and replies with
// ScheduleAckPayload carrying the same Version.
type ScheduleSetPayload struct {
Version int64 `json:"version"`
Schedules []Schedule `json:"schedules"`
}
// ScheduleAckPayload — agent confirms it has applied a given version.
type ScheduleAckPayload struct {
Version int64 `json:"version"`
AppliedAt time.Time `json:"applied_at"`
}
// ScheduleFirePayload — agent reports a local cron entry just fired.
// Server is expected to look up the schedule, build a CommandRun
// payload from it, persist a job row, and return MsgCommandRun on
// the same connection. ScheduledAt is the wall-clock time the
// agent's cron fired (audit / forensic value when network jitter
// pushes the actual command.run dispatch later).
type ScheduleFirePayload struct {
ScheduleID string `json:"schedule_id"`
ScheduledAt time.Time `json:"scheduled_at"`
}
// ConfigUpdatePayload — server pushes per-host config (currently just
// repo connection details). Empty fields mean "leave existing alone";
// to clear something, send an explicit zero value.
type ConfigUpdatePayload struct {
RepoURL string `json:"repo_url,omitempty"`
RepoPassword string `json:"repo_password,omitempty"` // sensitive
RepoUsername string `json:"repo_username,omitempty"`
RepoCredential string `json:"repo_credential,omitempty"` // sensitive (for rest server basic auth)
HookShell string `json:"hook_shell,omitempty"`
}
// AgentUpdateAvailablePayload — informational only; the agent does
// NOT self-update. See spec.md §4.2 for the package-manager-based
// update model.
type AgentUpdateAvailablePayload struct {
LatestVersion string `json:"latest_version"`
PackageURL string `json:"package_url"` // apt repo / choco source
Changelog string `json:"changelog,omitempty"`
}