Files
kb/engine/kb/database.py
T
steve b5a203d2aa Add bulk operations and remove collections abstraction
- Add bulk delete, bulk tags, and bulk set-tags engine endpoints
  (POST /api/v1/bulk/delete, /bulk/tags, /bulk/set-tags)
- Filter-based selection: by tags, doc_type, ID list, ID range
- Safety threshold (KB_BULK_SAFETY_PERCENT, default 70%) prevents
  accidental mass operations unless force=true
- Synchronous execution with audit trail via jobs table
- Add kb_bulk_delete, kb_bulk_tags, kb_bulk_set_tags MCP tools
- Add kb bulk-remove, bulk-tag, bulk-set-tags CLI commands
- Remove collection abstraction from MCP server (use tags instead)
- Remove kb_set_collection MCP tool
- Update SKILL.md, MCP.md, README.md documentation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:34:47 +01:00

505 lines
18 KiB
Python

"""Database module for kb-engine v2.
Provides SQLite database access with WAL mode, FTS5 full-text search,
and sqlite-vec vector storage for embeddings.
"""
import json
import sqlite3
import struct
from typing import Any, Optional
def build_enriched_text(title: str, chunk_text: str, metadata: dict | None = None) -> str:
"""Build enriched text by prepending document title and optional section header.
Format: "{title} > {section_header}\\n\\n{chunk_text}" or "{title}\\n\\n{chunk_text}".
"""
section_header = (metadata or {}).get("section_header")
if section_header:
return f"{title} > {section_header}\n\n{chunk_text}"
return f"{title}\n\n{chunk_text}"
def _backfill_enriched_text(conn: sqlite3.Connection) -> None:
"""Backfill enriched_text for all existing chunks."""
rows = conn.execute(
"SELECT c.id, c.text, c.metadata, d.title "
"FROM chunks c JOIN documents d ON c.document_id = d.id"
).fetchall()
for row in rows:
metadata = json.loads(row["metadata"]) if row["metadata"] else None
enriched = build_enriched_text(row["title"], row["text"], metadata)
conn.execute("UPDATE chunks SET enriched_text = ? WHERE id = ?", (enriched, row["id"]))
def _rebuild_fts(conn: sqlite3.Connection) -> None:
"""Drop and recreate chunks_fts to index enriched_text, with updated triggers."""
conn.executescript("""
DROP TRIGGER IF EXISTS chunks_ai;
DROP TRIGGER IF EXISTS chunks_ad;
DROP TRIGGER IF EXISTS chunks_au;
DROP TABLE IF EXISTS chunks_fts;
CREATE VIRTUAL TABLE chunks_fts USING fts5(
text,
content=chunks,
content_rowid=id
);
CREATE TRIGGER chunks_ai AFTER INSERT ON chunks BEGIN
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
END;
CREATE TRIGGER chunks_ad AFTER DELETE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
END;
CREATE TRIGGER chunks_au AFTER UPDATE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
END;
""")
# Repopulate FTS from existing enriched_text
conn.execute("INSERT INTO chunks_fts(rowid, text) SELECT id, enriched_text FROM chunks")
def get_connection(db_path: str) -> sqlite3.Connection:
"""Return a sqlite3 connection with WAL mode, Row factory, and foreign keys enabled."""
import sqlite_vec
conn = sqlite3.connect(db_path)
conn.enable_load_extension(True)
sqlite_vec.load(conn)
conn.enable_load_extension(False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_schema(conn: sqlite3.Connection, embedding_dim: int) -> None:
"""Create all tables if they do not already exist."""
conn.executescript(f"""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY,
title TEXT,
source_path TEXT,
content_hash TEXT UNIQUE,
doc_type TEXT,
language TEXT,
stored_path TEXT,
original_filename TEXT,
created_at TEXT DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY,
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER,
text TEXT,
enriched_text TEXT,
token_count INTEGER,
metadata TEXT DEFAULT '{{}}',
UNIQUE(document_id, chunk_index)
);
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
text,
content=chunks,
content_rowid=id
);
-- Triggers to keep FTS index in sync with chunks table (using enriched_text)
CREATE TRIGGER IF NOT EXISTS chunks_ai AFTER INSERT ON chunks BEGIN
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
END;
CREATE TRIGGER IF NOT EXISTS chunks_ad AFTER DELETE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
END;
CREATE TRIGGER IF NOT EXISTS chunks_au AFTER UPDATE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
END;
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE COLLATE NOCASE
);
CREATE TABLE IF NOT EXISTS document_tags (
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
tag_id INTEGER REFERENCES tags(id) ON DELETE CASCADE,
UNIQUE(document_id, tag_id)
);
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY,
filename TEXT,
status TEXT DEFAULT 'queued' CHECK(status IN ('queued','processing','done','failed','skipped')),
doc_type TEXT,
tags_json TEXT DEFAULT '[]',
title TEXT,
error TEXT,
document_id INTEGER,
chunk_count INTEGER DEFAULT 0,
staging_path TEXT,
content_hash TEXT,
created_at TEXT DEFAULT current_timestamp,
completed_at TEXT
);
""")
# sqlite-vec virtual table (cannot use IF NOT EXISTS with vec0, so check first)
existing = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='chunks_vec'"
).fetchone()
if not existing:
conn.execute(
f"CREATE VIRTUAL TABLE chunks_vec USING vec0(embedding float[{embedding_dim}], chunk_id integer)"
)
# Migrate: add content_hash to jobs if missing (added in v2.0.5)
cols = {row[1] for row in conn.execute("PRAGMA table_info(jobs)").fetchall()}
if "content_hash" not in cols:
conn.execute("ALTER TABLE jobs ADD COLUMN content_hash TEXT")
# Migrate: add stored_path and original_filename to documents if missing
doc_cols = {row[1] for row in conn.execute("PRAGMA table_info(documents)").fetchall()}
if "stored_path" not in doc_cols:
conn.execute("ALTER TABLE documents ADD COLUMN stored_path TEXT")
if "original_filename" not in doc_cols:
conn.execute("ALTER TABLE documents ADD COLUMN original_filename TEXT")
# Migrate: add enriched_text to chunks and rebuild FTS to index it
chunk_cols = {row[1] for row in conn.execute("PRAGMA table_info(chunks)").fetchall()}
if "enriched_text" not in chunk_cols:
conn.execute("ALTER TABLE chunks ADD COLUMN enriched_text TEXT")
_backfill_enriched_text(conn)
_rebuild_fts(conn)
# Migrate: add updated_at to documents if missing (v3.0.0)
if "updated_at" not in doc_cols:
conn.execute("ALTER TABLE documents ADD COLUMN updated_at TEXT")
# Migrate: add job_type to jobs if missing (bulk operations)
job_cols = {row[1] for row in conn.execute("PRAGMA table_info(jobs)").fetchall()}
if "job_type" not in job_cols:
conn.execute("ALTER TABLE jobs ADD COLUMN job_type TEXT DEFAULT 'ingest'")
conn.commit()
# ---------------------------------------------------------------------------
# Config helpers
# ---------------------------------------------------------------------------
def get_db_config(conn: sqlite3.Connection, key: str, default: Optional[str] = None) -> Optional[str]:
"""Retrieve a value from the config table."""
row = conn.execute("SELECT value FROM config WHERE key = ?", (key,)).fetchone()
return row["value"] if row else default
def set_db_config(conn: sqlite3.Connection, key: str, value: str) -> None:
"""Insert or update a value in the config table."""
conn.execute(
"INSERT INTO config(key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(key, value),
)
conn.commit()
# ---------------------------------------------------------------------------
# Document helpers
# ---------------------------------------------------------------------------
def hash_exists(conn: sqlite3.Connection, content_hash: str) -> bool:
"""Check whether a document with the given content hash is already ingested."""
row = conn.execute(
"SELECT 1 FROM documents WHERE content_hash = ?", (content_hash,)
).fetchone()
return row is not None
def get_document_by_hash(conn: sqlite3.Connection, content_hash: str) -> dict | None:
"""Return duplicate info for a given hash, or None.
Checks both the documents table (already ingested) and the jobs table
(queued/processing). Returns a dict with either ``document_id`` or
``job_id`` so callers can distinguish the two cases.
"""
row = conn.execute(
"SELECT id, title FROM documents WHERE content_hash = ?", (content_hash,)
).fetchone()
if row is not None:
return {"document_id": row["id"], "title": row["title"]}
# Also check pending/processing jobs that haven't been committed to documents yet
row = conn.execute(
"SELECT id, filename FROM jobs WHERE content_hash = ? AND status IN ('queued', 'processing')",
(content_hash,),
).fetchone()
if row is not None:
return {"job_id": row["id"], "title": row["filename"]}
return None
def insert_document(
conn: sqlite3.Connection,
title: str,
source_path: str,
content_hash: str,
doc_type: str,
language: Optional[str] = None,
) -> int:
"""Insert a new document and return its id."""
cur = conn.execute(
"INSERT INTO documents(title, source_path, content_hash, doc_type, language) VALUES (?, ?, ?, ?, ?)",
(title, source_path, content_hash, doc_type, language),
)
conn.commit()
return cur.lastrowid
# ---------------------------------------------------------------------------
# Chunk / embedding helpers
# ---------------------------------------------------------------------------
def insert_chunk(
conn: sqlite3.Connection,
document_id: int,
chunk_index: int,
text: str,
enriched_text: str | None = None,
token_count: Optional[int] = None,
metadata: Any = None,
) -> int:
"""Insert a chunk and return its id. *metadata* is JSON-serialized if it is a dict."""
if metadata is None:
metadata_str = "{}"
elif isinstance(metadata, dict):
metadata_str = json.dumps(metadata)
else:
metadata_str = str(metadata)
cur = conn.execute(
"INSERT INTO chunks(document_id, chunk_index, text, enriched_text, token_count, metadata) VALUES (?, ?, ?, ?, ?, ?)",
(document_id, chunk_index, text, enriched_text or text, token_count, metadata_str),
)
conn.commit()
return cur.lastrowid
def insert_embedding(conn: sqlite3.Connection, chunk_id: int, embedding: list[float]) -> None:
"""Insert an embedding vector into chunks_vec using struct-packed floats."""
blob = struct.pack(f"{len(embedding)}f", *embedding)
conn.execute(
"INSERT INTO chunks_vec(embedding, chunk_id) VALUES (?, ?)",
(blob, chunk_id),
)
conn.commit()
# ---------------------------------------------------------------------------
# Tagging helpers
# ---------------------------------------------------------------------------
def tag_document(conn: sqlite3.Connection, document_id: int, tag_names: list[str]) -> None:
"""Create tags if needed and associate them with a document."""
for name in tag_names:
conn.execute("INSERT OR IGNORE INTO tags(name) VALUES (?)", (name,))
tag_id = conn.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()["id"]
conn.execute(
"INSERT OR IGNORE INTO document_tags(document_id, tag_id) VALUES (?, ?)",
(document_id, tag_id),
)
conn.commit()
def untag_document(conn: sqlite3.Connection, document_id: int, tag_names: list[str]) -> None:
"""Remove tag associations from a document."""
for name in tag_names:
row = conn.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()
if row:
conn.execute(
"DELETE FROM document_tags WHERE document_id = ? AND tag_id = ?",
(document_id, row["id"]),
)
conn.commit()
# ---------------------------------------------------------------------------
# Bulk operation helpers
# ---------------------------------------------------------------------------
def resolve_bulk_selection(
conn: sqlite3.Connection,
document_ids: list[int] | None = None,
tags: list[str] | None = None,
doc_type: str | None = None,
from_id: int | None = None,
to_id: int | None = None,
) -> list[int]:
"""Return document IDs matching the bulk selection filter.
Filters combine with AND logic. At least one filter must be provided.
"""
sql = "SELECT DISTINCT d.id FROM documents d"
joins: list[str] = []
where: list[str] = []
params: list = []
if tags:
for i, tag in enumerate(tags):
joins.append(f"JOIN document_tags dt{i} ON d.id = dt{i}.document_id")
joins.append(f"JOIN tags t{i} ON dt{i}.tag_id = t{i}.id")
where.append(f"t{i}.name = ?")
params.append(tag)
if doc_type:
where.append("d.doc_type = ?")
params.append(doc_type)
if document_ids:
placeholders = ",".join("?" for _ in document_ids)
where.append(f"d.id IN ({placeholders})")
params.extend(document_ids)
if from_id is not None:
where.append("d.id >= ?")
params.append(from_id)
if to_id is not None:
where.append("d.id <= ?")
params.append(to_id)
if joins:
sql += " " + " ".join(joins)
if where:
sql += " WHERE " + " AND ".join(where)
rows = conn.execute(sql, params).fetchall()
return [row["id"] for row in rows]
def create_bulk_job(
conn: sqlite3.Connection,
job_type: str,
filters_json: str,
matched: int,
succeeded: int,
failed: int,
errors_json: str = "[]",
) -> int:
"""Create an audit log entry for a bulk operation and return its id."""
cur = conn.execute(
"""INSERT INTO jobs(filename, status, job_type, document_id, chunk_count, error, completed_at)
VALUES (?, ?, ?, ?, ?, ?, current_timestamp)""",
(
filters_json,
"done" if failed == 0 else "partial_failure",
job_type,
matched,
succeeded,
errors_json if failed > 0 else None,
),
)
conn.commit()
return cur.lastrowid
def count_documents(conn: sqlite3.Connection) -> int:
"""Return total number of documents in the database."""
row = conn.execute("SELECT COUNT(*) AS cnt FROM documents").fetchone()
return row["cnt"]
# ---------------------------------------------------------------------------
# Vec table management
# ---------------------------------------------------------------------------
def recreate_vec_table(conn: sqlite3.Connection, dim: int) -> None:
"""Drop and recreate the chunks_vec virtual table (for reindexing)."""
conn.execute("DROP TABLE IF EXISTS chunks_vec")
conn.execute(
f"CREATE VIRTUAL TABLE chunks_vec USING vec0(embedding float[{dim}], chunk_id integer)"
)
conn.commit()
# ---------------------------------------------------------------------------
# Job CRUD
# ---------------------------------------------------------------------------
_TERMINAL_STATUSES = {"done", "failed", "skipped"}
def create_job(
conn: sqlite3.Connection,
filename: str,
staging_path: str,
doc_type: Optional[str] = None,
tags_json: str = "[]",
title: Optional[str] = None,
content_hash: Optional[str] = None,
) -> int:
"""Create a new ingest job and return its id."""
cur = conn.execute(
"INSERT INTO jobs(filename, staging_path, doc_type, tags_json, title, content_hash) VALUES (?, ?, ?, ?, ?, ?)",
(filename, staging_path, doc_type, tags_json, title, content_hash),
)
conn.commit()
return cur.lastrowid
def get_job(conn: sqlite3.Connection, job_id: int) -> Optional[sqlite3.Row]:
"""Return a single job row by id, or None."""
return conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
def list_jobs(conn: sqlite3.Connection, status: Optional[str] = None) -> list[sqlite3.Row]:
"""Return jobs ordered newest first, optionally filtered by status."""
if status:
return conn.execute(
"SELECT * FROM jobs WHERE status = ? ORDER BY created_at DESC", (status,)
).fetchall()
return conn.execute("SELECT * FROM jobs ORDER BY created_at DESC").fetchall()
def update_job_status(
conn: sqlite3.Connection,
job_id: int,
status: str,
error: Optional[str] = None,
document_id: Optional[int] = None,
chunk_count: Optional[int] = None,
) -> None:
"""Update a job's status and related fields. Sets completed_at for terminal states."""
fields = ["status = ?"]
params: list[Any] = [status]
if error is not None:
fields.append("error = ?")
params.append(error)
if document_id is not None:
fields.append("document_id = ?")
params.append(document_id)
if chunk_count is not None:
fields.append("chunk_count = ?")
params.append(chunk_count)
if status in _TERMINAL_STATUSES:
fields.append("completed_at = current_timestamp")
params.append(job_id)
conn.execute(f"UPDATE jobs SET {', '.join(fields)} WHERE id = ?", params)
conn.commit()