Chunk enrichment: prepend document title to embeddings

Adds enriched_text column to chunks table that prepends document title
(and section header when present) to chunk text. Embeddings and FTS now
use enriched text for better search relevance. Includes schema migration
with backfill for existing data.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-29 21:03:48 +01:00
parent 5f9946efc9
commit b2176c36ea
10 changed files with 278 additions and 21 deletions
+70 -7
View File
@@ -10,6 +10,60 @@ import struct
from typing import Any, Optional
def build_enriched_text(title: str, chunk_text: str, metadata: dict | None = None) -> str:
"""Build enriched text by prepending document title and optional section header.
Format: "{title} > {section_header}\\n\\n{chunk_text}" or "{title}\\n\\n{chunk_text}".
"""
section_header = (metadata or {}).get("section_header")
if section_header:
return f"{title} > {section_header}\n\n{chunk_text}"
return f"{title}\n\n{chunk_text}"
def _backfill_enriched_text(conn: sqlite3.Connection) -> None:
"""Backfill enriched_text for all existing chunks."""
rows = conn.execute(
"SELECT c.id, c.text, c.metadata, d.title "
"FROM chunks c JOIN documents d ON c.document_id = d.id"
).fetchall()
for row in rows:
metadata = json.loads(row["metadata"]) if row["metadata"] else None
enriched = build_enriched_text(row["title"], row["text"], metadata)
conn.execute("UPDATE chunks SET enriched_text = ? WHERE id = ?", (enriched, row["id"]))
def _rebuild_fts(conn: sqlite3.Connection) -> None:
"""Drop and recreate chunks_fts to index enriched_text, with updated triggers."""
conn.executescript("""
DROP TRIGGER IF EXISTS chunks_ai;
DROP TRIGGER IF EXISTS chunks_ad;
DROP TRIGGER IF EXISTS chunks_au;
DROP TABLE IF EXISTS chunks_fts;
CREATE VIRTUAL TABLE chunks_fts USING fts5(
text,
content=chunks,
content_rowid=id
);
CREATE TRIGGER chunks_ai AFTER INSERT ON chunks BEGIN
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
END;
CREATE TRIGGER chunks_ad AFTER DELETE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
END;
CREATE TRIGGER chunks_au AFTER UPDATE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
END;
""")
# Repopulate FTS from existing enriched_text
conn.execute("INSERT INTO chunks_fts(rowid, text) SELECT id, enriched_text FROM chunks")
def get_connection(db_path: str) -> sqlite3.Connection:
"""Return a sqlite3 connection with WAL mode, Row factory, and foreign keys enabled."""
import sqlite_vec
@@ -44,6 +98,7 @@ def init_schema(conn: sqlite3.Connection, embedding_dim: int) -> None:
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER,
text TEXT,
enriched_text TEXT,
token_count INTEGER,
metadata TEXT DEFAULT '{{}}',
UNIQUE(document_id, chunk_index)
@@ -55,18 +110,18 @@ def init_schema(conn: sqlite3.Connection, embedding_dim: int) -> None:
content_rowid=id
);
-- Triggers to keep FTS index in sync with chunks table
-- Triggers to keep FTS index in sync with chunks table (using enriched_text)
CREATE TRIGGER IF NOT EXISTS chunks_ai AFTER INSERT ON chunks BEGIN
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.text);
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
END;
CREATE TRIGGER IF NOT EXISTS chunks_ad AFTER DELETE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.text);
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
END;
CREATE TRIGGER IF NOT EXISTS chunks_au AFTER UPDATE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.text);
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.text);
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
END;
CREATE TABLE IF NOT EXISTS tags (
@@ -123,6 +178,13 @@ def init_schema(conn: sqlite3.Connection, embedding_dim: int) -> None:
if "original_filename" not in doc_cols:
conn.execute("ALTER TABLE documents ADD COLUMN original_filename TEXT")
# Migrate: add enriched_text to chunks and rebuild FTS to index it
chunk_cols = {row[1] for row in conn.execute("PRAGMA table_info(chunks)").fetchall()}
if "enriched_text" not in chunk_cols:
conn.execute("ALTER TABLE chunks ADD COLUMN enriched_text TEXT")
_backfill_enriched_text(conn)
_rebuild_fts(conn)
conn.commit()
@@ -205,6 +267,7 @@ def insert_chunk(
document_id: int,
chunk_index: int,
text: str,
enriched_text: str | None = None,
token_count: Optional[int] = None,
metadata: Any = None,
) -> int:
@@ -217,8 +280,8 @@ def insert_chunk(
metadata_str = str(metadata)
cur = conn.execute(
"INSERT INTO chunks(document_id, chunk_index, text, token_count, metadata) VALUES (?, ?, ?, ?, ?)",
(document_id, chunk_index, text, token_count, metadata_str),
"INSERT INTO chunks(document_id, chunk_index, text, enriched_text, token_count, metadata) VALUES (?, ?, ?, ?, ?, ?)",
(document_id, chunk_index, text, enriched_text or text, token_count, metadata_str),
)
conn.commit()
return cur.lastrowid
+3 -3
View File
@@ -19,10 +19,10 @@ async def reindex():
conn = get_connection(cfg.db_path)
try:
# Fetch all chunks
rows = conn.execute("SELECT id, text FROM chunks ORDER BY id").fetchall()
# Fetch all chunks — use enriched_text for embedding (includes title context)
rows = conn.execute("SELECT id, enriched_text FROM chunks ORDER BY id").fetchall()
chunk_ids = [row["id"] for row in rows]
chunk_texts = [row["text"] for row in rows]
chunk_texts = [row["enriched_text"] or "" for row in rows]
logger.info("Reindexing %d chunks with model '%s'", len(chunk_ids), cfg.model)
+19 -8
View File
@@ -8,6 +8,7 @@ import shutil
from pathlib import Path
from kb import config, database, embeddings, staging
from kb.database import build_enriched_text
from kb.ingest import detector
logger = logging.getLogger("kb.worker")
@@ -146,20 +147,30 @@ def _process_job(job_row) -> tuple[str, int | None, int]:
)
chunk_texts = [c if isinstance(c, str) else c["text"] for c in chunks]
vectors = embeddings.embed_texts(chunk_texts)
chunk_metas = []
for idx, c in enumerate(chunks):
if isinstance(c, str):
chunk_metas.append(None)
else:
meta = {k: v for k, v in c.items() if k != "text"} or None
chunk_metas.append(meta)
for idx, (chunk_text, vector) in enumerate(zip(chunk_texts, vectors)):
metadata = None
if not isinstance(chunks[idx], str):
metadata = {
k: v for k, v in chunks[idx].items() if k != "text"
} or None
enriched_texts = [
build_enriched_text(title, ct, cm)
for ct, cm in zip(chunk_texts, chunk_metas)
]
vectors = embeddings.embed_texts(enriched_texts)
for idx, (chunk_text, enriched, vector) in enumerate(
zip(chunk_texts, enriched_texts, vectors)
):
chunk_id = database.insert_chunk(
conn,
document_id=doc_id,
chunk_index=idx,
text=chunk_text,
metadata=metadata,
enriched_text=enriched,
metadata=chunk_metas[idx],
)
database.insert_embedding(conn, chunk_id, vector)