package store import ( "context" "database/sql" "encoding/json" "errors" "fmt" "time" ) // Job mirrors the jobs table. type Job struct { ID string HostID string Kind string Status string ScheduledID *string ActorKind string // user|schedule|system ActorID *string StartedAt *time.Time FinishedAt *time.Time ExitCode *int Stats json.RawMessage Error *string CreatedAt time.Time } // CreateJob inserts a queued job. The agent will mark it running // when it actually starts work. func (s *Store) CreateJob(ctx context.Context, j Job) error { _, err := s.db.ExecContext(ctx, `INSERT INTO jobs (id, host_id, kind, status, actor_kind, actor_id, created_at) VALUES (?, ?, ?, 'queued', ?, ?, ?)`, j.ID, j.HostID, j.Kind, j.ActorKind, nullable(j.ActorID), j.CreatedAt.UTC().Format(time.RFC3339Nano)) if err != nil { return fmt.Errorf("store: create job: %w", err) } return nil } // MarkJobStarted flips status to 'running' and records started_at. func (s *Store) MarkJobStarted(ctx context.Context, id string, when time.Time) error { res, err := s.db.ExecContext(ctx, `UPDATE jobs SET status = 'running', started_at = ? WHERE id = ? AND status IN ('queued','running')`, when.UTC().Format(time.RFC3339Nano), id) if err != nil { return fmt.Errorf("store: mark started: %w", err) } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } // MarkJobFinished records the terminal state. func (s *Store) MarkJobFinished(ctx context.Context, id, status string, exitCode int, stats json.RawMessage, errMsg string, when time.Time) error { if len(stats) == 0 { stats = json.RawMessage("null") } res, err := s.db.ExecContext(ctx, `UPDATE jobs SET status = ?, finished_at = ?, exit_code = ?, stats = ?, error = ? WHERE id = ?`, status, when.UTC().Format(time.RFC3339Nano), exitCode, string(stats), nullableStr(errMsg), id) if err != nil { return fmt.Errorf("store: mark finished: %w", err) } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } // AppendJobLog records one line of agent output. seq is the agent's // monotonic sequence number; gaps imply lost data. func (s *Store) AppendJobLog(ctx context.Context, jobID string, seq int64, ts time.Time, stream, payload string) error { _, err := s.db.ExecContext(ctx, `INSERT INTO job_logs (job_id, seq, ts, stream, payload) VALUES (?,?,?,?,?)`, jobID, seq, ts.UTC().Format(time.RFC3339Nano), stream, payload) if err != nil { return fmt.Errorf("store: append job log: %w", err) } return nil } // GetJob returns a job row. func (s *Store) GetJob(ctx context.Context, id string) (*Job, error) { row := s.db.QueryRowContext(ctx, `SELECT id, host_id, kind, status, scheduled_id, actor_kind, actor_id, started_at, finished_at, exit_code, stats, error, created_at FROM jobs WHERE id = ?`, id) var ( j Job schedID sql.NullString actorID sql.NullString startedAt sql.NullString finishedAt sql.NullString exitCode sql.NullInt64 stats sql.NullString errMsg sql.NullString createdAt string ) if err := row.Scan(&j.ID, &j.HostID, &j.Kind, &j.Status, &schedID, &j.ActorKind, &actorID, &startedAt, &finishedAt, &exitCode, &stats, &errMsg, &createdAt); err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, ErrNotFound } return nil, fmt.Errorf("store: scan job: %w", err) } if schedID.Valid { s := schedID.String j.ScheduledID = &s } if actorID.Valid { s := actorID.String j.ActorID = &s } if startedAt.Valid { t, _ := time.Parse(time.RFC3339Nano, startedAt.String) j.StartedAt = &t } if finishedAt.Valid { t, _ := time.Parse(time.RFC3339Nano, finishedAt.String) j.FinishedAt = &t } if exitCode.Valid { i := int(exitCode.Int64) j.ExitCode = &i } if stats.Valid && stats.String != "" { j.Stats = json.RawMessage(stats.String) } if errMsg.Valid { s := errMsg.String j.Error = &s } t, _ := time.Parse(time.RFC3339Nano, createdAt) j.CreatedAt = t return &j, nil } func nullableStr(s string) any { if s == "" { return nil } return s }