feat(store): seen-set read state with floor baseline and compaction
This commit is contained in:
@@ -0,0 +1,154 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import "database/sql"
|
||||||
|
|
||||||
|
// EnsureFolderBaseline initialises folder_state on first contact, or resets it
|
||||||
|
// when the server's UIDVALIDITY differs from what we stored.
|
||||||
|
func (s *Store) EnsureFolderBaseline(account, folder string, uidvalidity, maxUID uint32) error {
|
||||||
|
a, err := s.GetAccount(account)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
curUIDValidity uint32
|
||||||
|
haveRow bool
|
||||||
|
)
|
||||||
|
row := s.db.QueryRow(
|
||||||
|
"SELECT uidvalidity FROM folder_state WHERE account_id=? AND folder=?", a.ID, folder)
|
||||||
|
switch err := row.Scan(&curUIDValidity); err {
|
||||||
|
case nil:
|
||||||
|
haveRow = true
|
||||||
|
case sql.ErrNoRows:
|
||||||
|
haveRow = false
|
||||||
|
default:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if haveRow && curUIDValidity == uidvalidity {
|
||||||
|
return nil // already baselined, same validity
|
||||||
|
}
|
||||||
|
|
||||||
|
floor := maxUID
|
||||||
|
if a.ProcessBacklog {
|
||||||
|
floor = 0
|
||||||
|
}
|
||||||
|
tx, err := s.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
if _, err := tx.Exec("DELETE FROM acked WHERE account_id=? AND folder=?", a.ID, folder); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(`
|
||||||
|
INSERT INTO folder_state(account_id,folder,uidvalidity,floor_uid)
|
||||||
|
VALUES(?,?,?,?)
|
||||||
|
ON CONFLICT(account_id,folder) DO UPDATE SET uidvalidity=excluded.uidvalidity, floor_uid=excluded.floor_uid`,
|
||||||
|
a.ID, folder, uidvalidity, floor); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) floor(account, folder string) (uint32, error) {
|
||||||
|
id, err := s.accountID(account)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
var f uint32
|
||||||
|
err = s.db.QueryRow(
|
||||||
|
"SELECT floor_uid FROM folder_state WHERE account_id=? AND folder=?", id, folder).Scan(&f)
|
||||||
|
return f, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsNew reports whether uid is unread: above the floor and not acked.
|
||||||
|
func (s *Store) IsNew(account, folder string, uid uint32) (bool, error) {
|
||||||
|
id, err := s.accountID(account)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
var floor uint32
|
||||||
|
if err := s.db.QueryRow(
|
||||||
|
"SELECT floor_uid FROM folder_state WHERE account_id=? AND folder=?", id, folder).Scan(&floor); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if uid <= floor {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
var one int
|
||||||
|
err = s.db.QueryRow(
|
||||||
|
"SELECT 1 FROM acked WHERE account_id=? AND folder=? AND uid=?", id, folder, uid).Scan(&one)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return false, nil // present in acked
|
||||||
|
}
|
||||||
|
|
||||||
|
// FilterNew returns the subset of uids that are new, preserving order.
|
||||||
|
func (s *Store) FilterNew(account, folder string, uids []uint32) ([]uint32, error) {
|
||||||
|
out := make([]uint32, 0, len(uids))
|
||||||
|
for _, u := range uids {
|
||||||
|
n, err := s.IsNew(account, folder, u)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if n {
|
||||||
|
out = append(out, u)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ack records uids as processed (ignoring any at or below the floor) then compacts.
|
||||||
|
func (s *Store) Ack(account, folder string, uidvalidity uint32, uids ...uint32) error {
|
||||||
|
id, err := s.accountID(account)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tx, err := s.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
var floor uint32
|
||||||
|
if err := tx.QueryRow(
|
||||||
|
"SELECT floor_uid FROM folder_state WHERE account_id=? AND folder=?", id, folder).Scan(&floor); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, u := range uids {
|
||||||
|
if u <= floor {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(
|
||||||
|
"INSERT OR IGNORE INTO acked(account_id,folder,uidvalidity,uid) VALUES(?,?,?,?)",
|
||||||
|
id, folder, uidvalidity, u); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Compact: while floor+1 is acked, advance floor and drop that row.
|
||||||
|
for {
|
||||||
|
next := floor + 1
|
||||||
|
var present int
|
||||||
|
err := tx.QueryRow(
|
||||||
|
"SELECT 1 FROM acked WHERE account_id=? AND folder=? AND uid=?", id, folder, next).Scan(&present)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(
|
||||||
|
"DELETE FROM acked WHERE account_id=? AND folder=? AND uid=?", id, folder, next); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
floor = next
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(
|
||||||
|
"UPDATE folder_state SET floor_uid=? WHERE account_id=? AND folder=?", floor, id, folder); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
@@ -0,0 +1,107 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func backlogAccount(t *testing.T, s *Store, backlog bool) {
|
||||||
|
t.Helper()
|
||||||
|
a := sampleAccount()
|
||||||
|
a.ProcessBacklog = backlog
|
||||||
|
if _, err := s.AddAccount(a); err != nil {
|
||||||
|
t.Fatalf("AddAccount: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBaselineIgnoresHistoryByDefault(t *testing.T) {
|
||||||
|
s := openTemp(t)
|
||||||
|
backlogAccount(t, s, false)
|
||||||
|
if err := s.EnsureFolderBaseline("work", "INBOX", 1, 100); err != nil {
|
||||||
|
t.Fatalf("baseline: %v", err)
|
||||||
|
}
|
||||||
|
// Existing mail (uid <= 100) is not new; 101 is.
|
||||||
|
for _, tc := range []struct {
|
||||||
|
uid uint32
|
||||||
|
want bool
|
||||||
|
}{{50, false}, {100, false}, {101, true}} {
|
||||||
|
got, _ := s.IsNew("work", "INBOX", tc.uid)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Fatalf("IsNew(%d)=%v want %v", tc.uid, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBaselineBacklogProcessesAll(t *testing.T) {
|
||||||
|
s := openTemp(t)
|
||||||
|
backlogAccount(t, s, true)
|
||||||
|
_ = s.EnsureFolderBaseline("work", "INBOX", 1, 100)
|
||||||
|
if n, _ := s.IsNew("work", "INBOX", 1); !n {
|
||||||
|
t.Fatal("with backlog, uid 1 must be new")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAckRemovesFromNewAndIsOutOfOrderSafe(t *testing.T) {
|
||||||
|
s := openTemp(t)
|
||||||
|
backlogAccount(t, s, false)
|
||||||
|
_ = s.EnsureFolderBaseline("work", "INBOX", 1, 100)
|
||||||
|
// New mail arrives: 101..105. Ack out of order: 103 then 101.
|
||||||
|
_ = s.Ack("work", "INBOX", 1, 103)
|
||||||
|
if n, _ := s.IsNew("work", "INBOX", 103); n {
|
||||||
|
t.Fatal("103 acked, must not be new")
|
||||||
|
}
|
||||||
|
if n, _ := s.IsNew("work", "INBOX", 101); !n {
|
||||||
|
t.Fatal("101 not acked yet, must still be new")
|
||||||
|
}
|
||||||
|
_ = s.Ack("work", "INBOX", 1, 101, 102)
|
||||||
|
got, _ := s.FilterNew("work", "INBOX", []uint32{101, 102, 103, 104, 105})
|
||||||
|
want := []uint32{104, 105}
|
||||||
|
if len(got) != 2 || got[0] != want[0] || got[1] != want[1] {
|
||||||
|
t.Fatalf("FilterNew got %v want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompactionAdvancesFloor(t *testing.T) {
|
||||||
|
s := openTemp(t)
|
||||||
|
backlogAccount(t, s, false)
|
||||||
|
_ = s.EnsureFolderBaseline("work", "INBOX", 1, 100)
|
||||||
|
// Ack a contiguous run just above the floor: 101,102,103.
|
||||||
|
_ = s.Ack("work", "INBOX", 1, 102, 101, 103)
|
||||||
|
f, _ := s.floor("work", "INBOX")
|
||||||
|
if f != 103 {
|
||||||
|
t.Fatalf("floor should advance to 103, got %d", f)
|
||||||
|
}
|
||||||
|
// The acked rows for the collapsed run are gone.
|
||||||
|
var n int
|
||||||
|
_ = s.db.QueryRow("SELECT COUNT(*) FROM acked").Scan(&n)
|
||||||
|
if n != 0 {
|
||||||
|
t.Fatalf("acked rows should be compacted away, got %d", n)
|
||||||
|
}
|
||||||
|
// A hole remains uncompacted: ack 105 (gap at 104).
|
||||||
|
_ = s.Ack("work", "INBOX", 1, 105)
|
||||||
|
f, _ = s.floor("work", "INBOX")
|
||||||
|
if f != 103 {
|
||||||
|
t.Fatalf("floor must stay at 103 while 104 is a hole, got %d", f)
|
||||||
|
}
|
||||||
|
_ = s.db.QueryRow("SELECT COUNT(*) FROM acked").Scan(&n)
|
||||||
|
if n != 1 {
|
||||||
|
t.Fatalf("105 should remain in acked, got %d rows", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUIDValidityChangeResets(t *testing.T) {
|
||||||
|
s := openTemp(t)
|
||||||
|
backlogAccount(t, s, false)
|
||||||
|
_ = s.EnsureFolderBaseline("work", "INBOX", 1, 100)
|
||||||
|
_ = s.Ack("work", "INBOX", 1, 105)
|
||||||
|
// Server reports a new UIDVALIDITY: state resets, re-baselines at new max.
|
||||||
|
if err := s.EnsureFolderBaseline("work", "INBOX", 2, 10); err != nil {
|
||||||
|
t.Fatalf("rebaseline: %v", err)
|
||||||
|
}
|
||||||
|
f, _ := s.floor("work", "INBOX")
|
||||||
|
if f != 10 {
|
||||||
|
t.Fatalf("floor should re-baseline to 10, got %d", f)
|
||||||
|
}
|
||||||
|
var n int
|
||||||
|
_ = s.db.QueryRow("SELECT COUNT(*) FROM acked").Scan(&n)
|
||||||
|
if n != 0 {
|
||||||
|
t.Fatalf("acked must reset on uidvalidity change, got %d", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user