608962441b
Closes the schedule reconciliation loop end-to-end.
* New `internal/agent/scheduler` package wraps robfig/cron/v3 with
the lifecycle the agent needs:
- Apply(ScheduleSetPayload, Sender) stops the prior cron (waiting
for in-flight entries to return), rebuilds from scratch, starts,
and emits schedule.ack with the version we just applied.
- Disabled entries skipped silently; bad cron exprs (which
shouldn't reach us — the server validates — but defensive)
log a warn and skip.
- On each cron tick the entry sends a new schedule.fire envelope
to the server with {schedule_id, scheduled_at}. The scheduler
itself never builds CommandRunPayloads — server is the source
of truth for jobs.
- tx is swapped on every Apply, so reconnect is handled
naturally: cron entries that fire against a dropped tx log
"no active connection" and skip the tick.
- Stop() is idempotent and waits for the cron's in-flight
workers via cron.Stop().Done().
* New wire message api.MsgScheduleFire + api.ScheduleFirePayload
for the agent → server "I just fired locally" RPC.
* Server-side dispatch (schedule_push.go: dispatchScheduledJob):
looks up the schedule by id, validates ownership + that it's
enabled, builds args from kind (paths for backup; other kinds
are still arg-less in Phase 2 and grow as those job kinds land
in P2-05..08), persists a jobs row with actor_kind=schedule +
scheduled_id, and writes command.run back on the same conn so
the agent runs through its existing dispatch path.
* store.CreateJob now writes scheduled_id. This column was in the
schema since 0001 but never populated — the original P1 path
only had operator-driven jobs, so actor_kind was always 'user'
and scheduled_id was always nil.
* cmd/agent/main.go integration: dispatcher gains a
*scheduler.Scheduler; the MsgScheduleSet case now hands the
payload to scheduler.Apply (in a goroutine so the WS read loop
keeps draining other messages).
* WS dispatcher gains OnScheduleFire alongside OnScheduleAck.
* Tests:
- scheduler unit tests (4): ack-on-apply, cron tick fires
schedule.fire envelope, disabled entries don't fire, replace-
prior-state stops the old cron.
- Server-side end-to-end: schedule.fire → command.run with the
right job_id / kind / args, plus jobs row with actor_kind=
"schedule" and scheduled_id linking back to the schedule.
Persistence of next-fire times across agent restarts is
deliberately deferred. A missed fire window during downtime
simply fires once on reconnect — that's the desirable behaviour
(the operator wants the missed backup to run, not be silently
skipped because we lost track of when it was due).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
88 lines
2.8 KiB
Go
88 lines
2.8 KiB
Go
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
)
|
|
|
|
// MessageType enumerates every kind of envelope that can flow over
|
|
// the agent ↔ server WebSocket. Keeping these as string constants
|
|
// (not iota ints) makes traffic readable in logs and packet captures.
|
|
type MessageType string
|
|
|
|
// Agent → server message types.
|
|
const (
|
|
MsgHello MessageType = "hello"
|
|
MsgHeartbeat MessageType = "heartbeat"
|
|
MsgJobStarted MessageType = "job.started"
|
|
MsgJobProgress MessageType = "job.progress"
|
|
MsgJobFinished MessageType = "job.finished"
|
|
MsgSnapshotsRpt MessageType = "snapshots.report"
|
|
MsgRepoStats MessageType = "repo.stats"
|
|
MsgLogStream MessageType = "log.stream"
|
|
MsgScheduleAck MessageType = "schedule.ack"
|
|
MsgScheduleFire MessageType = "schedule.fire" // agent: a local cron entry fired, please dispatch a job
|
|
MsgCommandResult MessageType = "command.result" // ack for command.run
|
|
MsgError MessageType = "error"
|
|
)
|
|
|
|
// Server → agent message types.
|
|
const (
|
|
MsgCommandRun MessageType = "command.run"
|
|
MsgCommandCancel MessageType = "command.cancel"
|
|
MsgScheduleSet MessageType = "schedule.set"
|
|
MsgConfigUpdate MessageType = "config.update"
|
|
MsgAgentUpdateAvail MessageType = "agent.update.available"
|
|
)
|
|
|
|
// Envelope is the framing for every WS message in either direction.
|
|
// Payload is parsed into the concrete struct chosen by Type.
|
|
//
|
|
// ID is set on RPC-style messages (command.run / command.result) so
|
|
// responses can be correlated. For one-shot pushes (heartbeat,
|
|
// job.progress) it is empty.
|
|
type Envelope struct {
|
|
Type MessageType `json:"type"`
|
|
ID string `json:"id,omitempty"`
|
|
Payload json.RawMessage `json:"payload,omitempty"`
|
|
}
|
|
|
|
// Marshal builds an envelope from a concrete payload struct.
|
|
func Marshal(t MessageType, id string, payload any) (Envelope, error) {
|
|
if payload == nil {
|
|
return Envelope{Type: t, ID: id}, nil
|
|
}
|
|
raw, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return Envelope{}, fmt.Errorf("marshal %s payload: %w", t, err)
|
|
}
|
|
return Envelope{Type: t, ID: id, Payload: raw}, nil
|
|
}
|
|
|
|
// UnmarshalPayload decodes the envelope's payload into v.
|
|
func (e Envelope) UnmarshalPayload(v any) error {
|
|
if len(e.Payload) == 0 {
|
|
return nil
|
|
}
|
|
return json.Unmarshal(e.Payload, v)
|
|
}
|
|
|
|
// ErrorCode enumerates error reasons surfaced over the wire.
|
|
// These are stable identifiers; client code may switch on them.
|
|
type ErrorCode string
|
|
|
|
const (
|
|
ErrProtocolTooOld ErrorCode = "protocol_too_old"
|
|
ErrProtocolTooNew ErrorCode = "protocol_too_new"
|
|
ErrUnauthorized ErrorCode = "unauthorized"
|
|
ErrBadRequest ErrorCode = "bad_request"
|
|
ErrInternal ErrorCode = "internal"
|
|
)
|
|
|
|
// ErrorPayload is the body of an `error` envelope.
|
|
type ErrorPayload struct {
|
|
Code ErrorCode `json:"code"`
|
|
Message string `json:"message"`
|
|
HelpURL string `json:"help_url,omitempty"`
|
|
}
|