6450bf1b88
Closes the schedule reconciliation loop end-to-end.
* New `internal/agent/scheduler` package wraps robfig/cron/v3 with
the lifecycle the agent needs:
- Apply(ScheduleSetPayload, Sender) stops the prior cron (waiting
for in-flight entries to return), rebuilds from scratch, starts,
and emits schedule.ack with the version we just applied.
- Disabled entries skipped silently; bad cron exprs (which
shouldn't reach us — the server validates — but defensive)
log a warn and skip.
- On each cron tick the entry sends a new schedule.fire envelope
to the server with {schedule_id, scheduled_at}. The scheduler
itself never builds CommandRunPayloads — server is the source
of truth for jobs.
- tx is swapped on every Apply, so reconnect is handled
naturally: cron entries that fire against a dropped tx log
"no active connection" and skip the tick.
- Stop() is idempotent and waits for the cron's in-flight
workers via cron.Stop().Done().
* New wire message api.MsgScheduleFire + api.ScheduleFirePayload
for the agent → server "I just fired locally" RPC.
* Server-side dispatch (schedule_push.go: dispatchScheduledJob):
looks up the schedule by id, validates ownership + that it's
enabled, builds args from kind (paths for backup; other kinds
are still arg-less in Phase 2 and grow as those job kinds land
in P2-05..08), persists a jobs row with actor_kind=schedule +
scheduled_id, and writes command.run back on the same conn so
the agent runs through its existing dispatch path.
* store.CreateJob now writes scheduled_id. This column was in the
schema since 0001 but never populated — the original P1 path
only had operator-driven jobs, so actor_kind was always 'user'
and scheduled_id was always nil.
* cmd/agent/main.go integration: dispatcher gains a
*scheduler.Scheduler; the MsgScheduleSet case now hands the
payload to scheduler.Apply (in a goroutine so the WS read loop
keeps draining other messages).
* WS dispatcher gains OnScheduleFire alongside OnScheduleAck.
* Tests:
- scheduler unit tests (4): ack-on-apply, cron tick fires
schedule.fire envelope, disabled entries don't fire, replace-
prior-state stops the old cron.
- Server-side end-to-end: schedule.fire → command.run with the
right job_id / kind / args, plus jobs row with actor_kind=
"schedule" and scheduled_id linking back to the schedule.
Persistence of next-fire times across agent restarts is
deliberately deferred. A missed fire window during downtime
simply fires once on reconnect — that's the desirable behaviour
(the operator wants the missed backup to run, not be silently
skipped because we lost track of when it was due).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
230 lines
8.0 KiB
Go
230 lines
8.0 KiB
Go
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"time"
|
|
)
|
|
|
|
// HostOS / HostArch are constrained string types. The store stores them
|
|
// raw, but agent metadata collection should populate them from these
|
|
// constants so we don't end up with both "linux" and "Linux" rows.
|
|
type HostOS string
|
|
|
|
const (
|
|
OSLinux HostOS = "linux"
|
|
OSWindows HostOS = "windows"
|
|
)
|
|
|
|
type HostArch string
|
|
|
|
const (
|
|
ArchAmd64 HostArch = "amd64"
|
|
ArchArm64 HostArch = "arm64"
|
|
)
|
|
|
|
// HelloPayload is the agent's first message after WS auth. The server
|
|
// upserts a Host row, marks it online, and (if protocol_version is
|
|
// acceptable) responds with a config.update + schedule.set burst.
|
|
type HelloPayload struct {
|
|
ProtocolVersion int `json:"protocol_version"`
|
|
AgentVersion string `json:"agent_version"`
|
|
ResticVersion string `json:"restic_version"`
|
|
Hostname string `json:"hostname"`
|
|
OS HostOS `json:"os"`
|
|
Arch HostArch `json:"arch"`
|
|
BootTime time.Time `json:"boot_time,omitempty"`
|
|
}
|
|
|
|
// HeartbeatPayload is sent by the agent every 30s. It carries no data
|
|
// today; presence is the signal. Future fields (load, free disk) can
|
|
// land here without bumping protocol_version.
|
|
type HeartbeatPayload struct {
|
|
SentAt time.Time `json:"sent_at"`
|
|
}
|
|
|
|
// JobKind is the operation an agent is being asked to run, or just ran.
|
|
type JobKind string
|
|
|
|
const (
|
|
JobBackup JobKind = "backup"
|
|
JobInit JobKind = "init"
|
|
JobForget JobKind = "forget"
|
|
JobPrune JobKind = "prune"
|
|
JobCheck JobKind = "check"
|
|
JobUnlock JobKind = "unlock"
|
|
)
|
|
|
|
// JobStatus is the lifecycle state of a job.
|
|
type JobStatus string
|
|
|
|
const (
|
|
JobQueued JobStatus = "queued"
|
|
JobRunning JobStatus = "running"
|
|
JobSucceeded JobStatus = "succeeded"
|
|
JobFailed JobStatus = "failed"
|
|
JobCancelled JobStatus = "cancelled"
|
|
)
|
|
|
|
// CommandRunPayload is the server → agent dispatch for a run-now job.
|
|
type CommandRunPayload struct {
|
|
JobID string `json:"job_id"`
|
|
Kind JobKind `json:"kind"`
|
|
Args []string `json:"args,omitempty"`
|
|
}
|
|
|
|
// CommandCancelPayload is the server → agent cancel signal.
|
|
type CommandCancelPayload struct {
|
|
JobID string `json:"job_id"`
|
|
}
|
|
|
|
// CommandResultPayload acks a command.run dispatch (the agent has
|
|
// accepted the job and persisted it locally) — this is *not* the job
|
|
// completion. job.finished signals that.
|
|
type CommandResultPayload struct {
|
|
JobID string `json:"job_id"`
|
|
Accepted bool `json:"accepted"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// JobStartedPayload — agent has begun execution.
|
|
type JobStartedPayload struct {
|
|
JobID string `json:"job_id"`
|
|
Kind JobKind `json:"kind"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
}
|
|
|
|
// JobProgressPayload — agent's periodic status while a job is running.
|
|
// Field set chosen to match what restic --json emits for `backup`;
|
|
// other kinds populate the subset that makes sense.
|
|
type JobProgressPayload struct {
|
|
JobID string `json:"job_id"`
|
|
PercentDone float64 `json:"percent_done"`
|
|
FilesDone int64 `json:"files_done"`
|
|
TotalFiles int64 `json:"total_files"`
|
|
BytesDone int64 `json:"bytes_done"`
|
|
TotalBytes int64 `json:"total_bytes"`
|
|
ETASeconds int64 `json:"eta_seconds"`
|
|
ThroughputBps int64 `json:"throughput_bps"`
|
|
}
|
|
|
|
// JobFinishedPayload — agent reports terminal state.
|
|
type JobFinishedPayload struct {
|
|
JobID string `json:"job_id"`
|
|
Status JobStatus `json:"status"`
|
|
ExitCode int `json:"exit_code"`
|
|
FinishedAt time.Time `json:"finished_at"`
|
|
Stats json.RawMessage `json:"stats,omitempty"` // restic summary blob
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// LogStreamLine is one entry of the live job log.
|
|
type LogStreamLine struct {
|
|
JobID string `json:"job_id"`
|
|
Seq int64 `json:"seq"`
|
|
TS time.Time `json:"ts"`
|
|
Stream LogStream `json:"stream"`
|
|
Payload string `json:"payload"`
|
|
}
|
|
|
|
// LogStream identifies which channel a log line came from.
|
|
type LogStream string
|
|
|
|
const (
|
|
LogStdout LogStream = "stdout"
|
|
LogStderr LogStream = "stderr"
|
|
LogEvent LogStream = "event" // parsed restic --json event
|
|
)
|
|
|
|
// SnapshotsReportPayload — agent dumps its full snapshot list after
|
|
// each successful backup, so the server can refresh its projection.
|
|
type SnapshotsReportPayload struct {
|
|
Snapshots []Snapshot `json:"snapshots"`
|
|
}
|
|
|
|
// Snapshot is the projection mirrored from `restic snapshots --json`.
|
|
// SizeBytes / FileCount come from the embedded summary block on
|
|
// restic 0.16+; older clients leave them at zero (the UI degrades
|
|
// gracefully).
|
|
type Snapshot struct {
|
|
ID string `json:"id"` // long restic snapshot ID
|
|
ShortID string `json:"short_id"` // 8-hex-char form
|
|
Time time.Time `json:"time"`
|
|
Hostname string `json:"hostname"`
|
|
Paths []string `json:"paths"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
SizeBytes int64 `json:"size_bytes,omitempty"`
|
|
FileCount int64 `json:"file_count,omitempty"`
|
|
}
|
|
|
|
// RepoStatsPayload — agent reports periodic repo health facts derived
|
|
// from `restic stats` and lock-file inspection.
|
|
type RepoStatsPayload struct {
|
|
SizeBytes int64 `json:"size_bytes"`
|
|
SnapshotCount int `json:"snapshot_count"`
|
|
DedupRatio float64 `json:"dedup_ratio"`
|
|
LastCheckAt time.Time `json:"last_check_at,omitempty"`
|
|
LastCheckStatus string `json:"last_check_status,omitempty"`
|
|
LockState string `json:"lock_state"` // locked|unlocked
|
|
}
|
|
|
|
// Schedule is the agent-facing view of a Schedule row. (Server-side
|
|
// CRUD shapes live in the http handlers; this is what gets pushed.)
|
|
type Schedule struct {
|
|
ID string `json:"id"`
|
|
Kind JobKind `json:"kind"`
|
|
CronExpr string `json:"cron_expr"`
|
|
Paths []string `json:"paths,omitempty"`
|
|
Excludes []string `json:"excludes,omitempty"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
RetentionPolicy json.RawMessage `json:"retention_policy,omitempty"`
|
|
Options json.RawMessage `json:"options,omitempty"`
|
|
PreHook string `json:"pre_hook,omitempty"`
|
|
PostHook string `json:"post_hook,omitempty"`
|
|
Enabled bool `json:"enabled"`
|
|
}
|
|
|
|
// ScheduleSetPayload — server pushes the full canonical schedule list
|
|
// for a host. Agent reconciles its local cron and replies with
|
|
// ScheduleAckPayload carrying the same Version.
|
|
type ScheduleSetPayload struct {
|
|
Version int64 `json:"version"`
|
|
Schedules []Schedule `json:"schedules"`
|
|
}
|
|
|
|
// ScheduleAckPayload — agent confirms it has applied a given version.
|
|
type ScheduleAckPayload struct {
|
|
Version int64 `json:"version"`
|
|
AppliedAt time.Time `json:"applied_at"`
|
|
}
|
|
|
|
// ScheduleFirePayload — agent reports a local cron entry just fired.
|
|
// Server is expected to look up the schedule, build a CommandRun
|
|
// payload from it, persist a job row, and return MsgCommandRun on
|
|
// the same connection. ScheduledAt is the wall-clock time the
|
|
// agent's cron fired (audit / forensic value when network jitter
|
|
// pushes the actual command.run dispatch later).
|
|
type ScheduleFirePayload struct {
|
|
ScheduleID string `json:"schedule_id"`
|
|
ScheduledAt time.Time `json:"scheduled_at"`
|
|
}
|
|
|
|
// ConfigUpdatePayload — server pushes per-host config (currently just
|
|
// repo connection details). Empty fields mean "leave existing alone";
|
|
// to clear something, send an explicit zero value.
|
|
type ConfigUpdatePayload struct {
|
|
RepoURL string `json:"repo_url,omitempty"`
|
|
RepoPassword string `json:"repo_password,omitempty"` // sensitive
|
|
RepoUsername string `json:"repo_username,omitempty"`
|
|
RepoCredential string `json:"repo_credential,omitempty"` // sensitive (for rest server basic auth)
|
|
HookShell string `json:"hook_shell,omitempty"`
|
|
}
|
|
|
|
// AgentUpdateAvailablePayload — informational only; the agent does
|
|
// NOT self-update. See spec.md §4.2 for the package-manager-based
|
|
// update model.
|
|
type AgentUpdateAvailablePayload struct {
|
|
LatestVersion string `json:"latest_version"`
|
|
PackageURL string `json:"package_url"` // apt repo / choco source
|
|
Changelog string `json:"changelog,omitempty"`
|
|
}
|