e871b05b38
CI / Test (linux/amd64) (pull_request) Successful in 34s
CI / Lint (pull_request) Failing after 16s
CI / Build (windows/amd64) (pull_request) Successful in 22s
CI / Build (linux/amd64) (pull_request) Successful in 20s
CI / Build (linux/arm64) (pull_request) Successful in 21s
Cleanup pass over the repo so CI can enforce lint going forward
without the only-new-issues escape hatch:
* gofumpt -w across the tree (31 hits, all formatting)
* misspell --fix (25 hits, US-locale spelling) — but reverted on
api.JobCancelled = "cancelled" since that literal is the wire +
DB CHECK constraint value, plus matched the case in store/fleet.go
back to "cancelled" and added //nolint:misspell on both for the
next time someone reaches for the auto-fix
* Wrap every `defer rows.Close()` / `defer stmt.Close()` /
`defer res.Body.Close()` in `defer func() { _ = .Close() }()`
to satisfy errcheck without losing the close itself
* websocket.Dial callers (1 prod, 4 tests) now capture + close the
upgrade response Body — coder/websocket can return res with a nil
Body on success, so the test deferred-closes guard against that
* Annotate the two genuine-by-design nilerr cases with //nolint
comments explaining why nil-on-error is the contract (cookie
missing = no session; ctx cancelled mid-backoff = clean shutdown)
* Add brief godoc on the 10 exported const groups + types that
revive flagged (api.HostOS/HostArch/JobKind/JobStatus/LogStream/
ErrorCode, restic.EventKind, store.Role, web.FS)
* Drop the unused (*Server).userByID method
* Inline the unparam baseView(active) — every UI page is under
the dashboard primary nav today
Result: `golangci-lint run ./...` reports 0 issues. CI lint job
no longer needs only-new-issues: true; X-06 follow-up entry in
tasks.md removed.
226 lines
8.0 KiB
Go
226 lines
8.0 KiB
Go
// schedule_push.go — server → agent reconciliation push and the
|
|
// inbound schedule.fire dispatch.
|
|
//
|
|
// The slim-schedule wire shape is built here from the (Schedule,
|
|
// SourceGroup) pair. Each schedule is sent with its resolved source
|
|
// groups inlined so the agent doesn't have to keep its own copy of
|
|
// the group catalog. Cron + enabled drive the agent's local timer;
|
|
// when an entry fires the agent ships back a schedule.fire and
|
|
// dispatchScheduledJob below resolves the schedule's groups and
|
|
// dispatches one backup command.run per group.
|
|
package http
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/oklog/ulid/v2"
|
|
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/server/ws"
|
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
|
)
|
|
|
|
// pushScheduleSetOnConn ships the canonical schedule set straight down
|
|
// the freshly-accepted hello connection. Callers are inside the
|
|
// hello window — using the conn directly avoids racing the hub's
|
|
// register-then-supersede sequence.
|
|
func (s *Server) pushScheduleSetOnConn(ctx context.Context, hostID string, conn *ws.Conn) {
|
|
payload, err := s.buildScheduleSetPayload(ctx, hostID)
|
|
if err != nil {
|
|
slog.Warn("schedule push: build payload", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
env, err := api.Marshal(api.MsgScheduleSet, "", payload)
|
|
if err != nil {
|
|
slog.Warn("schedule push: marshal payload", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
if err := conn.Send(sendCtx, env); err != nil {
|
|
slog.Warn("schedule push on-hello: send", "host_id", hostID, "err", err)
|
|
}
|
|
}
|
|
|
|
// pushScheduleSetAsync pushes the latest schedule set to a connected
|
|
// agent (via the hub) on a best-effort basis. Mutations call this
|
|
// after a successful CRUD; offline agents pick the new version up on
|
|
// next reconnect via pushScheduleSetOnConn.
|
|
func (s *Server) pushScheduleSetAsync(hostID string) {
|
|
if s.deps.Hub == nil || !s.deps.Hub.Connected(hostID) {
|
|
return
|
|
}
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
payload, err := s.buildScheduleSetPayload(ctx, hostID)
|
|
if err != nil {
|
|
slog.Warn("schedule push async: build payload", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
env, err := api.Marshal(api.MsgScheduleSet, "", payload)
|
|
if err != nil {
|
|
slog.Warn("schedule push async: marshal", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
if err := s.deps.Hub.Send(ctx, hostID, env); err != nil {
|
|
slog.Debug("schedule push async: send", "host_id", hostID, "err", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// buildScheduleSetPayload assembles the canonical wire shape: every
|
|
// schedule for the host with its source groups resolved inline.
|
|
func (s *Server) buildScheduleSetPayload(ctx context.Context, hostID string) (api.ScheduleSetPayload, error) {
|
|
version, err := s.deps.Store.GetHostScheduleVersion(ctx, hostID)
|
|
if err != nil {
|
|
return api.ScheduleSetPayload{}, err
|
|
}
|
|
schedules, err := s.deps.Store.ListSchedulesByHost(ctx, hostID)
|
|
if err != nil {
|
|
return api.ScheduleSetPayload{}, err
|
|
}
|
|
groups, err := s.deps.Store.ListSourceGroupsByHost(ctx, hostID)
|
|
if err != nil {
|
|
return api.ScheduleSetPayload{}, err
|
|
}
|
|
groupByID := make(map[string]store.SourceGroup, len(groups))
|
|
for _, g := range groups {
|
|
groupByID[g.ID] = g
|
|
}
|
|
|
|
out := api.ScheduleSetPayload{Version: version, Schedules: make([]api.Schedule, 0, len(schedules))}
|
|
for _, sc := range schedules {
|
|
entry := api.Schedule{
|
|
ID: sc.ID,
|
|
CronExpr: sc.CronExpr,
|
|
Enabled: sc.Enabled,
|
|
SourceGroups: make([]api.ScheduleSourceGroup, 0, len(sc.SourceGroupIDs)),
|
|
}
|
|
for _, gid := range sc.SourceGroupIDs {
|
|
g, ok := groupByID[gid]
|
|
if !ok {
|
|
continue
|
|
}
|
|
retention, _ := json.Marshal(g.RetentionPolicy)
|
|
entry.SourceGroups = append(entry.SourceGroups, api.ScheduleSourceGroup{
|
|
Name: g.Name,
|
|
Includes: g.Includes,
|
|
Excludes: g.Excludes,
|
|
RetentionPolicy: retention,
|
|
RetryMax: g.RetryMax,
|
|
RetryBackoffSeconds: g.RetryBackoffSeconds,
|
|
})
|
|
}
|
|
out.Schedules = append(out.Schedules, entry)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// applyScheduleAck persists the version the agent has confirmed.
|
|
func (s *Server) applyScheduleAck(ctx context.Context, hostID string, version int64, appliedAt time.Time) {
|
|
if err := s.deps.Store.SetHostAppliedScheduleVersion(ctx, hostID, version); err != nil {
|
|
slog.Warn("schedule.ack: persist applied version", "host_id", hostID, "err", err)
|
|
}
|
|
}
|
|
|
|
// dispatchScheduledJob handles an agent's schedule.fire. Resolves the
|
|
// schedule's source groups and dispatches one backup command.run per
|
|
// group, persisting each as a job row with actor_kind=schedule and
|
|
// scheduled_id pointing at the schedule.
|
|
func (s *Server) dispatchScheduledJob(ctx context.Context, hostID string, conn *ws.Conn, scheduleID string, scheduledAt time.Time) {
|
|
sc, err := s.deps.Store.GetSchedule(ctx, hostID, scheduleID)
|
|
if err != nil {
|
|
if errors.Is(err, store.ErrNotFound) {
|
|
slog.Info("schedule.fire: schedule unknown, ignoring",
|
|
"host_id", hostID, "schedule_id", scheduleID)
|
|
return
|
|
}
|
|
slog.Warn("schedule.fire: load schedule", "host_id", hostID, "err", err)
|
|
return
|
|
}
|
|
if !sc.Enabled {
|
|
slog.Info("schedule.fire: schedule disabled, ignoring",
|
|
"host_id", hostID, "schedule_id", scheduleID)
|
|
return
|
|
}
|
|
if len(sc.SourceGroupIDs) == 0 {
|
|
slog.Warn("schedule.fire: schedule has no source groups",
|
|
"host_id", hostID, "schedule_id", scheduleID)
|
|
return
|
|
}
|
|
for _, gid := range sc.SourceGroupIDs {
|
|
g, err := s.deps.Store.GetSourceGroup(ctx, hostID, gid)
|
|
if err != nil {
|
|
slog.Warn("schedule.fire: load source group",
|
|
"host_id", hostID, "schedule_id", scheduleID, "group_id", gid, "err", err)
|
|
continue
|
|
}
|
|
s.dispatchBackupForGroup(ctx, conn, hostID, scheduleID, g, scheduledAt)
|
|
}
|
|
}
|
|
|
|
// dispatchBackupForGroup builds and sends a single backup command.run
|
|
// envelope on conn for the given group. Persists the job row first so
|
|
// the live log viewer can subscribe to it.
|
|
// dispatchBackupForGroup persists a backup job row, sends the
|
|
// command.run envelope to the agent, and audit-logs the dispatch.
|
|
// Returns the persisted job ID on success, or "" on any failure
|
|
// (failures are slog.Warn-ed). Callers may use the returned ID to,
|
|
// e.g., redirect the UI to the live job log.
|
|
func (s *Server) dispatchBackupForGroup(ctx context.Context, conn *ws.Conn, hostID, scheduleID string, g *store.SourceGroup, scheduledAt time.Time) string {
|
|
jobID := ulid.Make().String()
|
|
now := time.Now().UTC()
|
|
scheduleRef := scheduleID
|
|
if err := s.deps.Store.CreateJob(ctx, store.Job{
|
|
ID: jobID,
|
|
HostID: hostID,
|
|
Kind: string(api.JobBackup),
|
|
ScheduledID: &scheduleRef,
|
|
ActorKind: "schedule",
|
|
CreatedAt: now,
|
|
}); err != nil {
|
|
slog.Warn("schedule.fire: persist job", "host_id", hostID,
|
|
"schedule_id", scheduleID, "group", g.Name, "err", err)
|
|
return ""
|
|
}
|
|
// Backup ignores RetentionPolicy — the forget cadence lives on
|
|
// host_repo_maintenance and is driven by the server-side ticker
|
|
// (P2R-06). Don't ship the field on backup dispatches.
|
|
env, err := api.Marshal(api.MsgCommandRun, jobID, api.CommandRunPayload{
|
|
JobID: jobID,
|
|
Kind: api.JobBackup,
|
|
Includes: g.Includes,
|
|
Excludes: g.Excludes,
|
|
Tag: g.Name,
|
|
})
|
|
if err != nil {
|
|
slog.Warn("schedule.fire: marshal command.run",
|
|
"host_id", hostID, "schedule_id", scheduleID, "err", err)
|
|
return ""
|
|
}
|
|
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
if err := conn.Send(sendCtx, env); err != nil {
|
|
slog.Warn("schedule.fire: send command.run",
|
|
"host_id", hostID, "schedule_id", scheduleID, "err", err)
|
|
return ""
|
|
}
|
|
_ = s.deps.Store.AppendAudit(ctx, store.AuditEntry{
|
|
ID: ulid.Make().String(),
|
|
Actor: "schedule",
|
|
Action: "job.run_now",
|
|
TargetKind: ptr("job"),
|
|
TargetID: &jobID,
|
|
TS: now,
|
|
})
|
|
slog.Info("schedule.fire: dispatched backup",
|
|
"host_id", hostID, "schedule_id", scheduleID,
|
|
"group", g.Name, "job_id", jobID, "scheduled_at", scheduledAt)
|
|
return jobID
|
|
}
|