package store import ( "context" "database/sql" "errors" "fmt" "time" ) // CreateSchedule inserts a new slim schedule row + the schedule_source_groups // junction entries for s.SourceGroupIDs, atomic in one tx. Bumps // host_schedule_version. Caller mints s.ID. func (st *Store) CreateSchedule(ctx context.Context, s *Schedule) error { if s.ID == "" || s.HostID == "" { return errors.New("store: schedule id and host_id required") } now := time.Now().UTC() s.CreatedAt = now s.UpdatedAt = now tx, err := st.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("store: begin tx: %w", err) } defer func() { _ = tx.Rollback() }() if _, err := tx.ExecContext(ctx, `INSERT INTO schedules (id, host_id, cron_expr, enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)`, s.ID, s.HostID, s.CronExpr, boolToInt(s.Enabled), now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), ); err != nil { return fmt.Errorf("store: create schedule: %w", err) } if err := writeScheduleGroupsTx(ctx, tx, s.ID, s.SourceGroupIDs); err != nil { return err } if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil { return err } return tx.Commit() } // UpdateSchedule replaces every editable field on an existing row // and rewrites the junction. Bumps host_schedule_version. func (st *Store) UpdateSchedule(ctx context.Context, s *Schedule) error { if s.ID == "" || s.HostID == "" { return errors.New("store: schedule id and host_id required") } now := time.Now().UTC() tx, err := st.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("store: begin tx: %w", err) } defer func() { _ = tx.Rollback() }() res, err := tx.ExecContext(ctx, `UPDATE schedules SET cron_expr = ?, enabled = ?, updated_at = ? WHERE id = ? AND host_id = ?`, s.CronExpr, boolToInt(s.Enabled), now.Format(time.RFC3339Nano), s.ID, s.HostID, ) if err != nil { return fmt.Errorf("store: update schedule: %w", err) } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } s.UpdatedAt = now // Rewrite junction wholesale — simpler than diffing. if _, err := tx.ExecContext(ctx, `DELETE FROM schedule_source_groups WHERE schedule_id = ?`, s.ID, ); err != nil { return fmt.Errorf("store: clear schedule junction: %w", err) } if err := writeScheduleGroupsTx(ctx, tx, s.ID, s.SourceGroupIDs); err != nil { return err } if err := bumpHostScheduleVersionTx(ctx, tx, s.HostID); err != nil { return err } return tx.Commit() } // DeleteSchedule removes a schedule and its junction rows; bumps // host_schedule_version. Returns ErrNotFound if no row matched. func (st *Store) DeleteSchedule(ctx context.Context, hostID, scheduleID string) error { tx, err := st.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("store: begin tx: %w", err) } defer func() { _ = tx.Rollback() }() res, err := tx.ExecContext(ctx, `DELETE FROM schedules WHERE id = ? AND host_id = ?`, scheduleID, hostID) if err != nil { return fmt.Errorf("store: delete schedule: %w", err) } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } // Junction rows go via ON DELETE CASCADE; nothing to do here. if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil { return err } return tx.Commit() } // GetSchedule returns one schedule (with its junction-resolved // SourceGroupIDs populated) by (host_id, id). ErrNotFound on miss. func (st *Store) GetSchedule(ctx context.Context, hostID, scheduleID string) (*Schedule, error) { row := st.db.QueryRowContext(ctx, `SELECT id, host_id, cron_expr, enabled, created_at, updated_at FROM schedules WHERE id = ? AND host_id = ?`, scheduleID, hostID) s, err := scanSchedule(row) if errors.Is(err, sql.ErrNoRows) { return nil, ErrNotFound } if err != nil { return nil, err } s.SourceGroupIDs, err = st.scheduleGroupIDs(ctx, scheduleID) if err != nil { return nil, err } return s, nil } // ListSchedulesByHost returns every schedule for a host, with // SourceGroupIDs resolved. func (st *Store) ListSchedulesByHost(ctx context.Context, hostID string) ([]Schedule, error) { rows, err := st.db.QueryContext(ctx, `SELECT id, host_id, cron_expr, enabled, created_at, updated_at FROM schedules WHERE host_id = ? ORDER BY created_at`, hostID) if err != nil { return nil, fmt.Errorf("store: list schedules: %w", err) } defer rows.Close() out := []Schedule{} for rows.Next() { s, err := scanScheduleRow(rows) if err != nil { return nil, err } out = append(out, *s) } if err := rows.Err(); err != nil { return nil, err } // Second pass to resolve junctions — small fleet, cheap. for i := range out { ids, err := st.scheduleGroupIDs(ctx, out[i].ID) if err != nil { return nil, err } out[i].SourceGroupIDs = ids } return out, nil } // GetHostScheduleVersion returns the current version for a host, or // 0 if no row exists yet. func (st *Store) GetHostScheduleVersion(ctx context.Context, hostID string) (int64, error) { var v int64 err := st.db.QueryRowContext(ctx, `SELECT version FROM host_schedule_version WHERE host_id = ?`, hostID).Scan(&v) if errors.Is(err, sql.ErrNoRows) { return 0, nil } if err != nil { return 0, fmt.Errorf("store: get schedule version: %w", err) } return v, nil } // SetHostAppliedScheduleVersion records the version the agent has // confirmed via schedule.ack. Idempotent. func (st *Store) SetHostAppliedScheduleVersion(ctx context.Context, hostID string, version int64) error { _, err := st.db.ExecContext(ctx, `UPDATE hosts SET applied_schedule_version = ? WHERE id = ?`, version, hostID) if err != nil { return fmt.Errorf("store: set applied schedule version: %w", err) } return nil } // BumpHostScheduleVersion is the public wrapper for non-schedule // CRUD that needs to push to the agent — e.g. source-group edits // (paths/retention change), retry-policy edits. func (st *Store) BumpHostScheduleVersion(ctx context.Context, hostID string) error { tx, err := st.db.BeginTx(ctx, nil) if err != nil { return err } defer func() { _ = tx.Rollback() }() if err := bumpHostScheduleVersionTx(ctx, tx, hostID); err != nil { return err } return tx.Commit() } // bumpHostScheduleVersionTx upserts host_schedule_version, +1 each // call. Caller owns the tx. func bumpHostScheduleVersionTx(ctx context.Context, tx *sql.Tx, hostID string) error { if _, err := tx.ExecContext(ctx, `INSERT INTO host_schedule_version (host_id, version) VALUES (?, 1) ON CONFLICT(host_id) DO UPDATE SET version = version + 1`, hostID); err != nil { return fmt.Errorf("store: bump schedule version: %w", err) } return nil } // writeScheduleGroupsTx inserts the junction rows for one schedule. // Caller owns the tx; assumes the table is already empty for this id // (Update wipes before calling; Create starts empty). func writeScheduleGroupsTx(ctx context.Context, tx *sql.Tx, scheduleID string, groupIDs []string) error { for _, gid := range groupIDs { if gid == "" { continue } if _, err := tx.ExecContext(ctx, `INSERT INTO schedule_source_groups (schedule_id, source_group_id) VALUES (?, ?)`, scheduleID, gid, ); err != nil { return fmt.Errorf("store: link schedule %s to group %s: %w", scheduleID, gid, err) } } return nil } // scheduleGroupIDs reads the junction for one schedule. func (st *Store) scheduleGroupIDs(ctx context.Context, scheduleID string) ([]string, error) { rows, err := st.db.QueryContext(ctx, `SELECT source_group_id FROM schedule_source_groups WHERE schedule_id = ?`, scheduleID) if err != nil { return nil, fmt.Errorf("store: read schedule junction: %w", err) } defer rows.Close() out := []string{} for rows.Next() { var id string if err := rows.Scan(&id); err != nil { return nil, err } out = append(out, id) } return out, rows.Err() } // SchedulesUsingGroup is the inverse — list schedule IDs that // reference a given source group. Used for retention-conflict // detection and "X schedules use this group" UI labels. func (st *Store) SchedulesUsingGroup(ctx context.Context, groupID string) ([]string, error) { rows, err := st.db.QueryContext(ctx, `SELECT schedule_id FROM schedule_source_groups WHERE source_group_id = ?`, groupID) if err != nil { return nil, fmt.Errorf("store: schedules using group: %w", err) } defer rows.Close() out := []string{} for rows.Next() { var id string if err := rows.Scan(&id); err != nil { return nil, err } out = append(out, id) } return out, rows.Err() } // ----- scan helpers -------------------------------------------------- func scanSchedule(row *sql.Row) (*Schedule, error) { return scanScheduleRow(row) } type scheduleScanner interface { Scan(dest ...any) error } func scanScheduleRow(s scheduleScanner) (*Schedule, error) { var ( out Schedule createdAt, updatedAt string enabled int ) err := s.Scan(&out.ID, &out.HostID, &out.CronExpr, &enabled, &createdAt, &updatedAt) if err != nil { return nil, err } out.Enabled = enabled != 0 if t, err := time.Parse(time.RFC3339Nano, createdAt); err == nil { out.CreatedAt = t } if t, err := time.Parse(time.RFC3339Nano, updatedAt); err == nil { out.UpdatedAt = t } return &out, nil } func boolToInt(b bool) int { if b { return 1 } return 0 }