package store import ( "context" "database/sql" "errors" "fmt" "time" ) // HostRepoStats is the per-host projection of repo-level metrics. // All pointer fields are nullable; nil means "not yet known." The row // is created (or replaced) by UpsertHostRepoStats which merges in only // the non-nil fields from a patch. type HostRepoStats struct { HostID string TotalSizeBytes *int64 RawSizeBytes *int64 UniqueFiles *int64 SnapshotCount *int64 LastCheckAt *time.Time LastCheckStatus string // "" | "ok" | "errors_found" | "failed" LockPresent *bool LastPruneAt *time.Time LastPruneFreedBytes *int64 UpdatedAt time.Time } // GetHostRepoStats returns the row, or (nil, ErrNotFound) if absent. func (s *Store) GetHostRepoStats(ctx context.Context, hostID string) (*HostRepoStats, error) { row := s.db.QueryRowContext(ctx, `SELECT host_id, total_size_bytes, raw_size_bytes, unique_files, snapshot_count, last_check_at, last_check_status, lock_present, last_prune_at, last_prune_freed_bytes, updated_at FROM host_repo_stats WHERE host_id = ?`, hostID) return scanHostRepoStats(row) } // getHostRepoStatsTx is identical to GetHostRepoStats but runs on an // existing transaction so the fetch-merge-upsert in UpsertHostRepoStats // is fully serialised. func getHostRepoStatsTx(ctx context.Context, tx *sql.Tx, hostID string) (*HostRepoStats, error) { row := tx.QueryRowContext(ctx, `SELECT host_id, total_size_bytes, raw_size_bytes, unique_files, snapshot_count, last_check_at, last_check_status, lock_present, last_prune_at, last_prune_freed_bytes, updated_at FROM host_repo_stats WHERE host_id = ?`, hostID) return scanHostRepoStats(row) } // scanHostRepoStats scans one row from host_repo_stats. func scanHostRepoStats(row *sql.Row) (*HostRepoStats, error) { var ( st HostRepoStats totalSize sql.NullInt64 rawSize sql.NullInt64 uniqueFiles sql.NullInt64 snapshotCount sql.NullInt64 lastCheckAt sql.NullString lastCheckStatus sql.NullString lockPresent int64 lastPruneAt sql.NullString lastPruneFreed sql.NullInt64 updatedAt string ) if err := row.Scan( &st.HostID, &totalSize, &rawSize, &uniqueFiles, &snapshotCount, &lastCheckAt, &lastCheckStatus, &lockPresent, &lastPruneAt, &lastPruneFreed, &updatedAt, ); err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, ErrNotFound } return nil, fmt.Errorf("store: scan host_repo_stats: %w", err) } if totalSize.Valid { v := totalSize.Int64 st.TotalSizeBytes = &v } if rawSize.Valid { v := rawSize.Int64 st.RawSizeBytes = &v } if uniqueFiles.Valid { v := uniqueFiles.Int64 st.UniqueFiles = &v } if snapshotCount.Valid { v := snapshotCount.Int64 st.SnapshotCount = &v } if lastCheckAt.Valid { t, err := time.Parse(time.RFC3339Nano, lastCheckAt.String) if err != nil { return nil, fmt.Errorf("store: parse last_check_at: %w", err) } st.LastCheckAt = &t } if lastCheckStatus.Valid { st.LastCheckStatus = lastCheckStatus.String } lp := lockPresent != 0 st.LockPresent = &lp if lastPruneAt.Valid { t, err := time.Parse(time.RFC3339Nano, lastPruneAt.String) if err != nil { return nil, fmt.Errorf("store: parse last_prune_at: %w", err) } st.LastPruneAt = &t } if lastPruneFreed.Valid { v := lastPruneFreed.Int64 st.LastPruneFreedBytes = &v } t, err := time.Parse(time.RFC3339Nano, updatedAt) if err != nil { return nil, fmt.Errorf("store: parse host_repo_stats.updated_at: %w", err) } st.UpdatedAt = t return &st, nil } // UpsertHostRepoStats writes a partial update — only non-nil pointer // fields (and LastCheckStatus when non-empty) overwrite existing // columns. Wrapped in a transaction so concurrent upserts on the same // host don't lose updates. func (s *Store) UpsertHostRepoStats(ctx context.Context, hostID string, patch HostRepoStats) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("store: begin host_repo_stats tx: %w", err) } defer func() { _ = tx.Rollback() }() // Fetch existing row; start from zero if absent. cur, err := getHostRepoStatsTx(ctx, tx, hostID) if err != nil && !errors.Is(err, ErrNotFound) { return err } if cur == nil { cur = &HostRepoStats{HostID: hostID} } // Merge: non-nil patch fields overwrite current. if patch.TotalSizeBytes != nil { cur.TotalSizeBytes = patch.TotalSizeBytes } if patch.RawSizeBytes != nil { cur.RawSizeBytes = patch.RawSizeBytes } if patch.UniqueFiles != nil { cur.UniqueFiles = patch.UniqueFiles } if patch.SnapshotCount != nil { cur.SnapshotCount = patch.SnapshotCount } if patch.LastCheckAt != nil { cur.LastCheckAt = patch.LastCheckAt } if patch.LastCheckStatus != "" { cur.LastCheckStatus = patch.LastCheckStatus } if patch.LockPresent != nil { cur.LockPresent = patch.LockPresent } if patch.LastPruneAt != nil { cur.LastPruneAt = patch.LastPruneAt } if patch.LastPruneFreedBytes != nil { cur.LastPruneFreedBytes = patch.LastPruneFreedBytes } now := time.Now().UTC().Format(time.RFC3339Nano) // Convert *bool → int for lock_present. var lockPresentInt int64 if cur.LockPresent != nil && *cur.LockPresent { lockPresentInt = 1 } if _, err = tx.ExecContext(ctx, `INSERT INTO host_repo_stats (host_id, total_size_bytes, raw_size_bytes, unique_files, snapshot_count, last_check_at, last_check_status, lock_present, last_prune_at, last_prune_freed_bytes, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(host_id) DO UPDATE SET total_size_bytes = excluded.total_size_bytes, raw_size_bytes = excluded.raw_size_bytes, unique_files = excluded.unique_files, snapshot_count = excluded.snapshot_count, last_check_at = excluded.last_check_at, last_check_status = excluded.last_check_status, lock_present = excluded.lock_present, last_prune_at = excluded.last_prune_at, last_prune_freed_bytes = excluded.last_prune_freed_bytes, updated_at = excluded.updated_at`, hostID, nullableInt64(cur.TotalSizeBytes), nullableInt64(cur.RawSizeBytes), nullableInt64(cur.UniqueFiles), nullableInt64(cur.SnapshotCount), nullableTime(cur.LastCheckAt), nullableStr(cur.LastCheckStatus), lockPresentInt, nullableTime(cur.LastPruneAt), nullableInt64(cur.LastPruneFreedBytes), now, ); err != nil { return fmt.Errorf("store: upsert host_repo_stats: %w", err) } // Project total_size_bytes onto the dashboard's host row so the // "Repo size" column and FleetSummary.SUM(repo_size_bytes) stay in // sync with the latest report. We only write a non-nil size — a // patch that doesn't carry a size (e.g. a prune-only ack) leaves // the prior row value alone. if cur.TotalSizeBytes != nil { if _, err = tx.ExecContext(ctx, `UPDATE hosts SET repo_size_bytes = ? WHERE id = ?`, *cur.TotalSizeBytes, hostID, ); err != nil { return fmt.Errorf("store: project repo_size_bytes onto hosts row: %w", err) } } return tx.Commit() } // nullableInt64 converts *int64 to a database/sql-compatible nullable value. func nullableInt64(p *int64) any { if p == nil { return nil } return *p } // nullableTime converts *time.Time to an RFC3339Nano string or nil. func nullableTime(p *time.Time) any { if p == nil { return nil } return p.UTC().Format(time.RFC3339Nano) }