store: HostRepoStats projection (size, lock, last-check, last-prune)
This commit is contained in:
@@ -0,0 +1,215 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HostRepoStats is the per-host projection of repo-level metrics.
|
||||||
|
// All pointer fields are nullable; nil means "not yet known." The row
|
||||||
|
// is created (or replaced) by UpsertHostRepoStats which merges in only
|
||||||
|
// the non-nil fields from a patch.
|
||||||
|
type HostRepoStats struct {
|
||||||
|
HostID string
|
||||||
|
TotalSizeBytes *int64
|
||||||
|
RawSizeBytes *int64
|
||||||
|
UniqueFiles *int64
|
||||||
|
SnapshotCount *int64
|
||||||
|
LastCheckAt *time.Time
|
||||||
|
LastCheckStatus string // "" | "ok" | "errors_found" | "failed"
|
||||||
|
LockPresent *bool
|
||||||
|
LastPruneAt *time.Time
|
||||||
|
LastPruneFreedBytes *int64
|
||||||
|
UpdatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetHostRepoStats returns the row, or (nil, ErrNotFound) if absent.
|
||||||
|
func (s *Store) GetHostRepoStats(ctx context.Context, hostID string) (*HostRepoStats, error) {
|
||||||
|
row := s.db.QueryRowContext(ctx,
|
||||||
|
`SELECT host_id, total_size_bytes, raw_size_bytes, unique_files,
|
||||||
|
snapshot_count, last_check_at, last_check_status,
|
||||||
|
lock_present, last_prune_at, last_prune_freed_bytes, updated_at
|
||||||
|
FROM host_repo_stats WHERE host_id = ?`, hostID)
|
||||||
|
return scanHostRepoStats(row)
|
||||||
|
}
|
||||||
|
|
||||||
|
// scanHostRepoStats scans one row from host_repo_stats.
|
||||||
|
func scanHostRepoStats(row *sql.Row) (*HostRepoStats, error) {
|
||||||
|
var (
|
||||||
|
st HostRepoStats
|
||||||
|
totalSize sql.NullInt64
|
||||||
|
rawSize sql.NullInt64
|
||||||
|
uniqueFiles sql.NullInt64
|
||||||
|
snapshotCount sql.NullInt64
|
||||||
|
lastCheckAt sql.NullString
|
||||||
|
lastCheckStatus sql.NullString
|
||||||
|
lockPresent int64
|
||||||
|
lastPruneAt sql.NullString
|
||||||
|
lastPruneFreed sql.NullInt64
|
||||||
|
updatedAt string
|
||||||
|
)
|
||||||
|
if err := row.Scan(
|
||||||
|
&st.HostID,
|
||||||
|
&totalSize, &rawSize, &uniqueFiles, &snapshotCount,
|
||||||
|
&lastCheckAt, &lastCheckStatus,
|
||||||
|
&lockPresent,
|
||||||
|
&lastPruneAt, &lastPruneFreed,
|
||||||
|
&updatedAt,
|
||||||
|
); err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("store: scan host_repo_stats: %w", err)
|
||||||
|
}
|
||||||
|
if totalSize.Valid {
|
||||||
|
v := totalSize.Int64
|
||||||
|
st.TotalSizeBytes = &v
|
||||||
|
}
|
||||||
|
if rawSize.Valid {
|
||||||
|
v := rawSize.Int64
|
||||||
|
st.RawSizeBytes = &v
|
||||||
|
}
|
||||||
|
if uniqueFiles.Valid {
|
||||||
|
v := uniqueFiles.Int64
|
||||||
|
st.UniqueFiles = &v
|
||||||
|
}
|
||||||
|
if snapshotCount.Valid {
|
||||||
|
v := snapshotCount.Int64
|
||||||
|
st.SnapshotCount = &v
|
||||||
|
}
|
||||||
|
if lastCheckAt.Valid {
|
||||||
|
t, err := time.Parse(time.RFC3339Nano, lastCheckAt.String)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: parse last_check_at: %w", err)
|
||||||
|
}
|
||||||
|
st.LastCheckAt = &t
|
||||||
|
}
|
||||||
|
if lastCheckStatus.Valid {
|
||||||
|
st.LastCheckStatus = lastCheckStatus.String
|
||||||
|
}
|
||||||
|
lp := lockPresent != 0
|
||||||
|
st.LockPresent = &lp
|
||||||
|
if lastPruneAt.Valid {
|
||||||
|
t, err := time.Parse(time.RFC3339Nano, lastPruneAt.String)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: parse last_prune_at: %w", err)
|
||||||
|
}
|
||||||
|
st.LastPruneAt = &t
|
||||||
|
}
|
||||||
|
if lastPruneFreed.Valid {
|
||||||
|
v := lastPruneFreed.Int64
|
||||||
|
st.LastPruneFreedBytes = &v
|
||||||
|
}
|
||||||
|
t, err := time.Parse(time.RFC3339Nano, updatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store: parse host_repo_stats.updated_at: %w", err)
|
||||||
|
}
|
||||||
|
st.UpdatedAt = t
|
||||||
|
return &st, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpsertHostRepoStats writes a partial update — only non-nil pointer
|
||||||
|
// fields (and LastCheckStatus when non-empty) overwrite existing
|
||||||
|
// columns. Implemented as a row-fetch + merge + INSERT…ON CONFLICT so
|
||||||
|
// each call is atomic at the application level (sufficient for a
|
||||||
|
// single-writer server).
|
||||||
|
func (s *Store) UpsertHostRepoStats(ctx context.Context, hostID string, patch HostRepoStats) error {
|
||||||
|
// Fetch existing row; start from zero if absent.
|
||||||
|
cur, err := s.GetHostRepoStats(ctx, hostID)
|
||||||
|
if err != nil && !errors.Is(err, ErrNotFound) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if cur == nil {
|
||||||
|
cur = &HostRepoStats{HostID: hostID}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge: non-nil patch fields overwrite current.
|
||||||
|
if patch.TotalSizeBytes != nil {
|
||||||
|
cur.TotalSizeBytes = patch.TotalSizeBytes
|
||||||
|
}
|
||||||
|
if patch.RawSizeBytes != nil {
|
||||||
|
cur.RawSizeBytes = patch.RawSizeBytes
|
||||||
|
}
|
||||||
|
if patch.UniqueFiles != nil {
|
||||||
|
cur.UniqueFiles = patch.UniqueFiles
|
||||||
|
}
|
||||||
|
if patch.SnapshotCount != nil {
|
||||||
|
cur.SnapshotCount = patch.SnapshotCount
|
||||||
|
}
|
||||||
|
if patch.LastCheckAt != nil {
|
||||||
|
cur.LastCheckAt = patch.LastCheckAt
|
||||||
|
}
|
||||||
|
if patch.LastCheckStatus != "" {
|
||||||
|
cur.LastCheckStatus = patch.LastCheckStatus
|
||||||
|
}
|
||||||
|
if patch.LockPresent != nil {
|
||||||
|
cur.LockPresent = patch.LockPresent
|
||||||
|
}
|
||||||
|
if patch.LastPruneAt != nil {
|
||||||
|
cur.LastPruneAt = patch.LastPruneAt
|
||||||
|
}
|
||||||
|
if patch.LastPruneFreedBytes != nil {
|
||||||
|
cur.LastPruneFreedBytes = patch.LastPruneFreedBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().UTC().Format(time.RFC3339Nano)
|
||||||
|
|
||||||
|
// Convert *bool → int for lock_present.
|
||||||
|
var lockPresentInt int64
|
||||||
|
if cur.LockPresent != nil && *cur.LockPresent {
|
||||||
|
lockPresentInt = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.db.ExecContext(ctx,
|
||||||
|
`INSERT INTO host_repo_stats
|
||||||
|
(host_id, total_size_bytes, raw_size_bytes, unique_files,
|
||||||
|
snapshot_count, last_check_at, last_check_status,
|
||||||
|
lock_present, last_prune_at, last_prune_freed_bytes, updated_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT(host_id) DO UPDATE SET
|
||||||
|
total_size_bytes = excluded.total_size_bytes,
|
||||||
|
raw_size_bytes = excluded.raw_size_bytes,
|
||||||
|
unique_files = excluded.unique_files,
|
||||||
|
snapshot_count = excluded.snapshot_count,
|
||||||
|
last_check_at = excluded.last_check_at,
|
||||||
|
last_check_status = excluded.last_check_status,
|
||||||
|
lock_present = excluded.lock_present,
|
||||||
|
last_prune_at = excluded.last_prune_at,
|
||||||
|
last_prune_freed_bytes = excluded.last_prune_freed_bytes,
|
||||||
|
updated_at = excluded.updated_at`,
|
||||||
|
hostID,
|
||||||
|
nullableInt64(cur.TotalSizeBytes),
|
||||||
|
nullableInt64(cur.RawSizeBytes),
|
||||||
|
nullableInt64(cur.UniqueFiles),
|
||||||
|
nullableInt64(cur.SnapshotCount),
|
||||||
|
nullableTime(cur.LastCheckAt),
|
||||||
|
nullableStr(cur.LastCheckStatus),
|
||||||
|
lockPresentInt,
|
||||||
|
nullableTime(cur.LastPruneAt),
|
||||||
|
nullableInt64(cur.LastPruneFreedBytes),
|
||||||
|
now,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("store: upsert host_repo_stats: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// nullableInt64 converts *int64 to a database/sql-compatible nullable value.
|
||||||
|
func nullableInt64(p *int64) any {
|
||||||
|
if p == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return *p
|
||||||
|
}
|
||||||
|
|
||||||
|
// nullableTime converts *time.Time to an RFC3339Nano string or nil.
|
||||||
|
func nullableTime(p *time.Time) any {
|
||||||
|
if p == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return p.UTC().Format(time.RFC3339Nano)
|
||||||
|
}
|
||||||
@@ -0,0 +1,131 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func int64ptr(v int64) *int64 { return &v }
|
||||||
|
func boolptr(v bool) *bool { return &v }
|
||||||
|
|
||||||
|
func TestHostRepoStatsRoundTrip(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
s := openTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
const hostID = "h-stats-test"
|
||||||
|
seedHost(t, s, hostID)
|
||||||
|
|
||||||
|
// 1. Initial upsert: set TotalSizeBytes only.
|
||||||
|
if err := s.UpsertHostRepoStats(ctx, hostID, HostRepoStats{
|
||||||
|
TotalSizeBytes: int64ptr(100),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("upsert 1: %v", err)
|
||||||
|
}
|
||||||
|
got, err := s.GetHostRepoStats(ctx, hostID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("get after upsert 1: %v", err)
|
||||||
|
}
|
||||||
|
if got.TotalSizeBytes == nil || *got.TotalSizeBytes != 100 {
|
||||||
|
t.Errorf("TotalSizeBytes: want 100, got %v", got.TotalSizeBytes)
|
||||||
|
}
|
||||||
|
if got.LastCheckStatus != "" {
|
||||||
|
t.Errorf("LastCheckStatus should be empty after first upsert, got %q", got.LastCheckStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Upsert with LastCheckStatus; TotalSizeBytes must be preserved.
|
||||||
|
if err := s.UpsertHostRepoStats(ctx, hostID, HostRepoStats{
|
||||||
|
LastCheckStatus: "ok",
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("upsert 2: %v", err)
|
||||||
|
}
|
||||||
|
got, err = s.GetHostRepoStats(ctx, hostID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("get after upsert 2: %v", err)
|
||||||
|
}
|
||||||
|
if got.TotalSizeBytes == nil || *got.TotalSizeBytes != 100 {
|
||||||
|
t.Errorf("TotalSizeBytes should still be 100 after second upsert, got %v", got.TotalSizeBytes)
|
||||||
|
}
|
||||||
|
if got.LastCheckStatus != "ok" {
|
||||||
|
t.Errorf("LastCheckStatus: want %q, got %q", "ok", got.LastCheckStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Upsert with LockPresent=true; all other fields preserved.
|
||||||
|
now := time.Now().UTC().Truncate(time.Second)
|
||||||
|
if err := s.UpsertHostRepoStats(ctx, hostID, HostRepoStats{
|
||||||
|
LockPresent: boolptr(true),
|
||||||
|
LastCheckAt: &now,
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("upsert 3: %v", err)
|
||||||
|
}
|
||||||
|
got, err = s.GetHostRepoStats(ctx, hostID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("get after upsert 3: %v", err)
|
||||||
|
}
|
||||||
|
if got.LockPresent == nil || !*got.LockPresent {
|
||||||
|
t.Error("LockPresent should be true after upsert 3")
|
||||||
|
}
|
||||||
|
if got.TotalSizeBytes == nil || *got.TotalSizeBytes != 100 {
|
||||||
|
t.Errorf("TotalSizeBytes still 100 expected, got %v", got.TotalSizeBytes)
|
||||||
|
}
|
||||||
|
if got.LastCheckStatus != "ok" {
|
||||||
|
t.Errorf("LastCheckStatus still 'ok' expected, got %q", got.LastCheckStatus)
|
||||||
|
}
|
||||||
|
if got.LastCheckAt == nil {
|
||||||
|
t.Error("LastCheckAt should be set")
|
||||||
|
} else if !got.LastCheckAt.UTC().Truncate(time.Second).Equal(now) {
|
||||||
|
t.Errorf("LastCheckAt: got %v, want %v", got.LastCheckAt.UTC().Truncate(time.Second), now)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Clear lock (set to false).
|
||||||
|
if err := s.UpsertHostRepoStats(ctx, hostID, HostRepoStats{
|
||||||
|
LockPresent: boolptr(false),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("upsert 4: %v", err)
|
||||||
|
}
|
||||||
|
got, err = s.GetHostRepoStats(ctx, hostID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("get after upsert 4: %v", err)
|
||||||
|
}
|
||||||
|
if got.LockPresent == nil || *got.LockPresent {
|
||||||
|
t.Error("LockPresent should be false after upsert 4")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHostRepoStatsNotFound(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
s := openTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, err := s.GetHostRepoStats(ctx, "no-such-host")
|
||||||
|
if !errors.Is(err, ErrNotFound) {
|
||||||
|
t.Errorf("expected ErrNotFound, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHostRepoStatsCascadeDelete(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
s := openTestStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
const hostID = "h-cascade-test"
|
||||||
|
seedHost(t, s, hostID)
|
||||||
|
|
||||||
|
if err := s.UpsertHostRepoStats(ctx, hostID, HostRepoStats{
|
||||||
|
TotalSizeBytes: int64ptr(999),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("upsert: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the host; stats row should cascade-delete.
|
||||||
|
if _, err := s.DB().ExecContext(ctx,
|
||||||
|
`DELETE FROM hosts WHERE id = ?`, hostID); err != nil {
|
||||||
|
t.Fatalf("delete host: %v", err)
|
||||||
|
}
|
||||||
|
_, err := s.GetHostRepoStats(ctx, hostID)
|
||||||
|
if !errors.Is(err, ErrNotFound) {
|
||||||
|
t.Errorf("after host delete, expected ErrNotFound for stats; got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user