e7136a4a20
New MCP server (mcp/) exposes kb operations as native MCP tools over
Streamable HTTP with Bearer token auth. Supports collections via tag
conventions, chunked file uploads, and agent-side search patterns.
Engine gains PATCH /api/v1/notes/{id} for in-place note updates with
transactional re-chunk/re-embed, and updated_at column on documents.
Go client adds updatenote command and Patch HTTP method.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
414 lines
15 KiB
Python
414 lines
15 KiB
Python
"""Database module for kb-engine v2.
|
|
|
|
Provides SQLite database access with WAL mode, FTS5 full-text search,
|
|
and sqlite-vec vector storage for embeddings.
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import struct
|
|
from typing import Any, Optional
|
|
|
|
|
|
def build_enriched_text(title: str, chunk_text: str, metadata: dict | None = None) -> str:
|
|
"""Build enriched text by prepending document title and optional section header.
|
|
|
|
Format: "{title} > {section_header}\\n\\n{chunk_text}" or "{title}\\n\\n{chunk_text}".
|
|
"""
|
|
section_header = (metadata or {}).get("section_header")
|
|
if section_header:
|
|
return f"{title} > {section_header}\n\n{chunk_text}"
|
|
return f"{title}\n\n{chunk_text}"
|
|
|
|
|
|
def _backfill_enriched_text(conn: sqlite3.Connection) -> None:
|
|
"""Backfill enriched_text for all existing chunks."""
|
|
rows = conn.execute(
|
|
"SELECT c.id, c.text, c.metadata, d.title "
|
|
"FROM chunks c JOIN documents d ON c.document_id = d.id"
|
|
).fetchall()
|
|
for row in rows:
|
|
metadata = json.loads(row["metadata"]) if row["metadata"] else None
|
|
enriched = build_enriched_text(row["title"], row["text"], metadata)
|
|
conn.execute("UPDATE chunks SET enriched_text = ? WHERE id = ?", (enriched, row["id"]))
|
|
|
|
|
|
def _rebuild_fts(conn: sqlite3.Connection) -> None:
|
|
"""Drop and recreate chunks_fts to index enriched_text, with updated triggers."""
|
|
conn.executescript("""
|
|
DROP TRIGGER IF EXISTS chunks_ai;
|
|
DROP TRIGGER IF EXISTS chunks_ad;
|
|
DROP TRIGGER IF EXISTS chunks_au;
|
|
DROP TABLE IF EXISTS chunks_fts;
|
|
|
|
CREATE VIRTUAL TABLE chunks_fts USING fts5(
|
|
text,
|
|
content=chunks,
|
|
content_rowid=id
|
|
);
|
|
|
|
CREATE TRIGGER chunks_ai AFTER INSERT ON chunks BEGIN
|
|
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
|
|
END;
|
|
|
|
CREATE TRIGGER chunks_ad AFTER DELETE ON chunks BEGIN
|
|
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
|
|
END;
|
|
|
|
CREATE TRIGGER chunks_au AFTER UPDATE ON chunks BEGIN
|
|
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
|
|
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
|
|
END;
|
|
""")
|
|
# Repopulate FTS from existing enriched_text
|
|
conn.execute("INSERT INTO chunks_fts(rowid, text) SELECT id, enriched_text FROM chunks")
|
|
|
|
|
|
def get_connection(db_path: str) -> sqlite3.Connection:
|
|
"""Return a sqlite3 connection with WAL mode, Row factory, and foreign keys enabled."""
|
|
import sqlite_vec
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
conn.enable_load_extension(True)
|
|
sqlite_vec.load(conn)
|
|
conn.enable_load_extension(False)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
return conn
|
|
|
|
|
|
def init_schema(conn: sqlite3.Connection, embedding_dim: int) -> None:
|
|
"""Create all tables if they do not already exist."""
|
|
conn.executescript(f"""
|
|
CREATE TABLE IF NOT EXISTS documents (
|
|
id INTEGER PRIMARY KEY,
|
|
title TEXT,
|
|
source_path TEXT,
|
|
content_hash TEXT UNIQUE,
|
|
doc_type TEXT,
|
|
language TEXT,
|
|
stored_path TEXT,
|
|
original_filename TEXT,
|
|
created_at TEXT DEFAULT current_timestamp
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS chunks (
|
|
id INTEGER PRIMARY KEY,
|
|
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
|
|
chunk_index INTEGER,
|
|
text TEXT,
|
|
enriched_text TEXT,
|
|
token_count INTEGER,
|
|
metadata TEXT DEFAULT '{{}}',
|
|
UNIQUE(document_id, chunk_index)
|
|
);
|
|
|
|
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
|
|
text,
|
|
content=chunks,
|
|
content_rowid=id
|
|
);
|
|
|
|
-- Triggers to keep FTS index in sync with chunks table (using enriched_text)
|
|
CREATE TRIGGER IF NOT EXISTS chunks_ai AFTER INSERT ON chunks BEGIN
|
|
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
|
|
END;
|
|
|
|
CREATE TRIGGER IF NOT EXISTS chunks_ad AFTER DELETE ON chunks BEGIN
|
|
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
|
|
END;
|
|
|
|
CREATE TRIGGER IF NOT EXISTS chunks_au AFTER UPDATE ON chunks BEGIN
|
|
INSERT INTO chunks_fts(chunks_fts, rowid, text) VALUES ('delete', old.id, old.enriched_text);
|
|
INSERT INTO chunks_fts(rowid, text) VALUES (new.id, new.enriched_text);
|
|
END;
|
|
|
|
CREATE TABLE IF NOT EXISTS tags (
|
|
id INTEGER PRIMARY KEY,
|
|
name TEXT UNIQUE COLLATE NOCASE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS document_tags (
|
|
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
|
|
tag_id INTEGER REFERENCES tags(id) ON DELETE CASCADE,
|
|
UNIQUE(document_id, tag_id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS config (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id INTEGER PRIMARY KEY,
|
|
filename TEXT,
|
|
status TEXT DEFAULT 'queued' CHECK(status IN ('queued','processing','done','failed','skipped')),
|
|
doc_type TEXT,
|
|
tags_json TEXT DEFAULT '[]',
|
|
title TEXT,
|
|
error TEXT,
|
|
document_id INTEGER,
|
|
chunk_count INTEGER DEFAULT 0,
|
|
staging_path TEXT,
|
|
content_hash TEXT,
|
|
created_at TEXT DEFAULT current_timestamp,
|
|
completed_at TEXT
|
|
);
|
|
""")
|
|
|
|
# sqlite-vec virtual table (cannot use IF NOT EXISTS with vec0, so check first)
|
|
existing = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='chunks_vec'"
|
|
).fetchone()
|
|
if not existing:
|
|
conn.execute(
|
|
f"CREATE VIRTUAL TABLE chunks_vec USING vec0(embedding float[{embedding_dim}], chunk_id integer)"
|
|
)
|
|
|
|
# Migrate: add content_hash to jobs if missing (added in v2.0.5)
|
|
cols = {row[1] for row in conn.execute("PRAGMA table_info(jobs)").fetchall()}
|
|
if "content_hash" not in cols:
|
|
conn.execute("ALTER TABLE jobs ADD COLUMN content_hash TEXT")
|
|
|
|
# Migrate: add stored_path and original_filename to documents if missing
|
|
doc_cols = {row[1] for row in conn.execute("PRAGMA table_info(documents)").fetchall()}
|
|
if "stored_path" not in doc_cols:
|
|
conn.execute("ALTER TABLE documents ADD COLUMN stored_path TEXT")
|
|
if "original_filename" not in doc_cols:
|
|
conn.execute("ALTER TABLE documents ADD COLUMN original_filename TEXT")
|
|
|
|
# Migrate: add enriched_text to chunks and rebuild FTS to index it
|
|
chunk_cols = {row[1] for row in conn.execute("PRAGMA table_info(chunks)").fetchall()}
|
|
if "enriched_text" not in chunk_cols:
|
|
conn.execute("ALTER TABLE chunks ADD COLUMN enriched_text TEXT")
|
|
_backfill_enriched_text(conn)
|
|
_rebuild_fts(conn)
|
|
|
|
# Migrate: add updated_at to documents if missing (v3.0.0)
|
|
if "updated_at" not in doc_cols:
|
|
conn.execute("ALTER TABLE documents ADD COLUMN updated_at TEXT")
|
|
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_db_config(conn: sqlite3.Connection, key: str, default: Optional[str] = None) -> Optional[str]:
|
|
"""Retrieve a value from the config table."""
|
|
row = conn.execute("SELECT value FROM config WHERE key = ?", (key,)).fetchone()
|
|
return row["value"] if row else default
|
|
|
|
|
|
def set_db_config(conn: sqlite3.Connection, key: str, value: str) -> None:
|
|
"""Insert or update a value in the config table."""
|
|
conn.execute(
|
|
"INSERT INTO config(key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value",
|
|
(key, value),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Document helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hash_exists(conn: sqlite3.Connection, content_hash: str) -> bool:
|
|
"""Check whether a document with the given content hash is already ingested."""
|
|
row = conn.execute(
|
|
"SELECT 1 FROM documents WHERE content_hash = ?", (content_hash,)
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
|
|
def get_document_by_hash(conn: sqlite3.Connection, content_hash: str) -> dict | None:
|
|
"""Return duplicate info for a given hash, or None.
|
|
|
|
Checks both the documents table (already ingested) and the jobs table
|
|
(queued/processing). Returns a dict with either ``document_id`` or
|
|
``job_id`` so callers can distinguish the two cases.
|
|
"""
|
|
row = conn.execute(
|
|
"SELECT id, title FROM documents WHERE content_hash = ?", (content_hash,)
|
|
).fetchone()
|
|
if row is not None:
|
|
return {"document_id": row["id"], "title": row["title"]}
|
|
# Also check pending/processing jobs that haven't been committed to documents yet
|
|
row = conn.execute(
|
|
"SELECT id, filename FROM jobs WHERE content_hash = ? AND status IN ('queued', 'processing')",
|
|
(content_hash,),
|
|
).fetchone()
|
|
if row is not None:
|
|
return {"job_id": row["id"], "title": row["filename"]}
|
|
return None
|
|
|
|
|
|
def insert_document(
|
|
conn: sqlite3.Connection,
|
|
title: str,
|
|
source_path: str,
|
|
content_hash: str,
|
|
doc_type: str,
|
|
language: Optional[str] = None,
|
|
) -> int:
|
|
"""Insert a new document and return its id."""
|
|
cur = conn.execute(
|
|
"INSERT INTO documents(title, source_path, content_hash, doc_type, language) VALUES (?, ?, ?, ?, ?)",
|
|
(title, source_path, content_hash, doc_type, language),
|
|
)
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chunk / embedding helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def insert_chunk(
|
|
conn: sqlite3.Connection,
|
|
document_id: int,
|
|
chunk_index: int,
|
|
text: str,
|
|
enriched_text: str | None = None,
|
|
token_count: Optional[int] = None,
|
|
metadata: Any = None,
|
|
) -> int:
|
|
"""Insert a chunk and return its id. *metadata* is JSON-serialized if it is a dict."""
|
|
if metadata is None:
|
|
metadata_str = "{}"
|
|
elif isinstance(metadata, dict):
|
|
metadata_str = json.dumps(metadata)
|
|
else:
|
|
metadata_str = str(metadata)
|
|
|
|
cur = conn.execute(
|
|
"INSERT INTO chunks(document_id, chunk_index, text, enriched_text, token_count, metadata) VALUES (?, ?, ?, ?, ?, ?)",
|
|
(document_id, chunk_index, text, enriched_text or text, token_count, metadata_str),
|
|
)
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
|
|
def insert_embedding(conn: sqlite3.Connection, chunk_id: int, embedding: list[float]) -> None:
|
|
"""Insert an embedding vector into chunks_vec using struct-packed floats."""
|
|
blob = struct.pack(f"{len(embedding)}f", *embedding)
|
|
conn.execute(
|
|
"INSERT INTO chunks_vec(embedding, chunk_id) VALUES (?, ?)",
|
|
(blob, chunk_id),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tagging helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def tag_document(conn: sqlite3.Connection, document_id: int, tag_names: list[str]) -> None:
|
|
"""Create tags if needed and associate them with a document."""
|
|
for name in tag_names:
|
|
conn.execute("INSERT OR IGNORE INTO tags(name) VALUES (?)", (name,))
|
|
tag_id = conn.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()["id"]
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO document_tags(document_id, tag_id) VALUES (?, ?)",
|
|
(document_id, tag_id),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def untag_document(conn: sqlite3.Connection, document_id: int, tag_names: list[str]) -> None:
|
|
"""Remove tag associations from a document."""
|
|
for name in tag_names:
|
|
row = conn.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()
|
|
if row:
|
|
conn.execute(
|
|
"DELETE FROM document_tags WHERE document_id = ? AND tag_id = ?",
|
|
(document_id, row["id"]),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Vec table management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def recreate_vec_table(conn: sqlite3.Connection, dim: int) -> None:
|
|
"""Drop and recreate the chunks_vec virtual table (for reindexing)."""
|
|
conn.execute("DROP TABLE IF EXISTS chunks_vec")
|
|
conn.execute(
|
|
f"CREATE VIRTUAL TABLE chunks_vec USING vec0(embedding float[{dim}], chunk_id integer)"
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Job CRUD
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_TERMINAL_STATUSES = {"done", "failed", "skipped"}
|
|
|
|
|
|
def create_job(
|
|
conn: sqlite3.Connection,
|
|
filename: str,
|
|
staging_path: str,
|
|
doc_type: Optional[str] = None,
|
|
tags_json: str = "[]",
|
|
title: Optional[str] = None,
|
|
content_hash: Optional[str] = None,
|
|
) -> int:
|
|
"""Create a new ingest job and return its id."""
|
|
cur = conn.execute(
|
|
"INSERT INTO jobs(filename, staging_path, doc_type, tags_json, title, content_hash) VALUES (?, ?, ?, ?, ?, ?)",
|
|
(filename, staging_path, doc_type, tags_json, title, content_hash),
|
|
)
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
|
|
def get_job(conn: sqlite3.Connection, job_id: int) -> Optional[sqlite3.Row]:
|
|
"""Return a single job row by id, or None."""
|
|
return conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
|
|
|
|
|
|
def list_jobs(conn: sqlite3.Connection, status: Optional[str] = None) -> list[sqlite3.Row]:
|
|
"""Return jobs ordered newest first, optionally filtered by status."""
|
|
if status:
|
|
return conn.execute(
|
|
"SELECT * FROM jobs WHERE status = ? ORDER BY created_at DESC", (status,)
|
|
).fetchall()
|
|
return conn.execute("SELECT * FROM jobs ORDER BY created_at DESC").fetchall()
|
|
|
|
|
|
def update_job_status(
|
|
conn: sqlite3.Connection,
|
|
job_id: int,
|
|
status: str,
|
|
error: Optional[str] = None,
|
|
document_id: Optional[int] = None,
|
|
chunk_count: Optional[int] = None,
|
|
) -> None:
|
|
"""Update a job's status and related fields. Sets completed_at for terminal states."""
|
|
fields = ["status = ?"]
|
|
params: list[Any] = [status]
|
|
|
|
if error is not None:
|
|
fields.append("error = ?")
|
|
params.append(error)
|
|
|
|
if document_id is not None:
|
|
fields.append("document_id = ?")
|
|
params.append(document_id)
|
|
|
|
if chunk_count is not None:
|
|
fields.append("chunk_count = ?")
|
|
params.append(chunk_count)
|
|
|
|
if status in _TERMINAL_STATUSES:
|
|
fields.append("completed_at = current_timestamp")
|
|
|
|
params.append(job_id)
|
|
conn.execute(f"UPDATE jobs SET {', '.join(fields)} WHERE id = ?", params)
|
|
conn.commit()
|