Files
restic-manager/internal/api/wire.go
T
steve 6450bf1b88
CI / Test (linux/amd64) (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Build (windows/amd64) (push) Has been cancelled
CI / Build (linux/amd64) (push) Has been cancelled
CI / Build (linux/arm64) (push) Has been cancelled
P2-02 (agent side) + P2-03: agent scheduler + schedule.fire dispatch
Closes the schedule reconciliation loop end-to-end.

* New `internal/agent/scheduler` package wraps robfig/cron/v3 with
  the lifecycle the agent needs:
  - Apply(ScheduleSetPayload, Sender) stops the prior cron (waiting
    for in-flight entries to return), rebuilds from scratch, starts,
    and emits schedule.ack with the version we just applied.
  - Disabled entries skipped silently; bad cron exprs (which
    shouldn't reach us — the server validates — but defensive)
    log a warn and skip.
  - On each cron tick the entry sends a new schedule.fire envelope
    to the server with {schedule_id, scheduled_at}. The scheduler
    itself never builds CommandRunPayloads — server is the source
    of truth for jobs.
  - tx is swapped on every Apply, so reconnect is handled
    naturally: cron entries that fire against a dropped tx log
    "no active connection" and skip the tick.
  - Stop() is idempotent and waits for the cron's in-flight
    workers via cron.Stop().Done().

* New wire message api.MsgScheduleFire + api.ScheduleFirePayload
  for the agent → server "I just fired locally" RPC.

* Server-side dispatch (schedule_push.go: dispatchScheduledJob):
  looks up the schedule by id, validates ownership + that it's
  enabled, builds args from kind (paths for backup; other kinds
  are still arg-less in Phase 2 and grow as those job kinds land
  in P2-05..08), persists a jobs row with actor_kind=schedule +
  scheduled_id, and writes command.run back on the same conn so
  the agent runs through its existing dispatch path.

* store.CreateJob now writes scheduled_id. This column was in the
  schema since 0001 but never populated — the original P1 path
  only had operator-driven jobs, so actor_kind was always 'user'
  and scheduled_id was always nil.

* cmd/agent/main.go integration: dispatcher gains a
  *scheduler.Scheduler; the MsgScheduleSet case now hands the
  payload to scheduler.Apply (in a goroutine so the WS read loop
  keeps draining other messages).

* WS dispatcher gains OnScheduleFire alongside OnScheduleAck.

* Tests:
  - scheduler unit tests (4): ack-on-apply, cron tick fires
    schedule.fire envelope, disabled entries don't fire, replace-
    prior-state stops the old cron.
  - Server-side end-to-end: schedule.fire → command.run with the
    right job_id / kind / args, plus jobs row with actor_kind=
    "schedule" and scheduled_id linking back to the schedule.

Persistence of next-fire times across agent restarts is
deliberately deferred. A missed fire window during downtime
simply fires once on reconnect — that's the desirable behaviour
(the operator wants the missed backup to run, not be silently
skipped because we lost track of when it was due).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 11:29:12 +01:00

88 lines
2.8 KiB
Go

package api
import (
"encoding/json"
"fmt"
)
// MessageType enumerates every kind of envelope that can flow over
// the agent ↔ server WebSocket. Keeping these as string constants
// (not iota ints) makes traffic readable in logs and packet captures.
type MessageType string
// Agent → server message types.
const (
MsgHello MessageType = "hello"
MsgHeartbeat MessageType = "heartbeat"
MsgJobStarted MessageType = "job.started"
MsgJobProgress MessageType = "job.progress"
MsgJobFinished MessageType = "job.finished"
MsgSnapshotsRpt MessageType = "snapshots.report"
MsgRepoStats MessageType = "repo.stats"
MsgLogStream MessageType = "log.stream"
MsgScheduleAck MessageType = "schedule.ack"
MsgScheduleFire MessageType = "schedule.fire" // agent: a local cron entry fired, please dispatch a job
MsgCommandResult MessageType = "command.result" // ack for command.run
MsgError MessageType = "error"
)
// Server → agent message types.
const (
MsgCommandRun MessageType = "command.run"
MsgCommandCancel MessageType = "command.cancel"
MsgScheduleSet MessageType = "schedule.set"
MsgConfigUpdate MessageType = "config.update"
MsgAgentUpdateAvail MessageType = "agent.update.available"
)
// Envelope is the framing for every WS message in either direction.
// Payload is parsed into the concrete struct chosen by Type.
//
// ID is set on RPC-style messages (command.run / command.result) so
// responses can be correlated. For one-shot pushes (heartbeat,
// job.progress) it is empty.
type Envelope struct {
Type MessageType `json:"type"`
ID string `json:"id,omitempty"`
Payload json.RawMessage `json:"payload,omitempty"`
}
// Marshal builds an envelope from a concrete payload struct.
func Marshal(t MessageType, id string, payload any) (Envelope, error) {
if payload == nil {
return Envelope{Type: t, ID: id}, nil
}
raw, err := json.Marshal(payload)
if err != nil {
return Envelope{}, fmt.Errorf("marshal %s payload: %w", t, err)
}
return Envelope{Type: t, ID: id, Payload: raw}, nil
}
// UnmarshalPayload decodes the envelope's payload into v.
func (e Envelope) UnmarshalPayload(v any) error {
if len(e.Payload) == 0 {
return nil
}
return json.Unmarshal(e.Payload, v)
}
// ErrorCode enumerates error reasons surfaced over the wire.
// These are stable identifiers; client code may switch on them.
type ErrorCode string
const (
ErrProtocolTooOld ErrorCode = "protocol_too_old"
ErrProtocolTooNew ErrorCode = "protocol_too_new"
ErrUnauthorized ErrorCode = "unauthorized"
ErrBadRequest ErrorCode = "bad_request"
ErrInternal ErrorCode = "internal"
)
// ErrorPayload is the body of an `error` envelope.
type ErrorPayload struct {
Code ErrorCode `json:"code"`
Message string `json:"message"`
HelpURL string `json:"help_url,omitempty"`
}