Upload-time duplicate detection, FTS5 query sanitization, release guard

- Reject duplicate uploads at the API boundary (HTTP 409) instead of
  silently skipping in the background worker. Checks both ingested
  documents and in-flight jobs via content_hash on the jobs table.
- Go client handles 409 with distinct messages for already-imported
  documents vs already-queued jobs.
- Sanitize FTS5 search queries by quoting each token to prevent syntax
  errors from special characters like ?, *, ", (), AND, OR, NOT.
- Add try/except safety net around FTS5 execute for edge cases.
- Add main branch guard to release.sh to prevent releasing from
  feature branches.
- Update specs and README to reflect new behaviour.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-26 23:05:07 +00:00
parent 63654a59b8
commit 6fec627503
20 changed files with 536 additions and 30 deletions
+31 -2
View File
@@ -94,6 +94,7 @@ def init_schema(conn: sqlite3.Connection, embedding_dim: int) -> None:
document_id INTEGER,
chunk_count INTEGER DEFAULT 0,
staging_path TEXT,
content_hash TEXT,
created_at TEXT DEFAULT current_timestamp,
completed_at TEXT
);
@@ -108,6 +109,11 @@ def init_schema(conn: sqlite3.Connection, embedding_dim: int) -> None:
f"CREATE VIRTUAL TABLE chunks_vec USING vec0(embedding float[{embedding_dim}], chunk_id integer)"
)
# Migrate: add content_hash to jobs if missing (added in v2.0.5)
cols = {row[1] for row in conn.execute("PRAGMA table_info(jobs)").fetchall()}
if "content_hash" not in cols:
conn.execute("ALTER TABLE jobs ADD COLUMN content_hash TEXT")
conn.commit()
@@ -142,6 +148,28 @@ def hash_exists(conn: sqlite3.Connection, content_hash: str) -> bool:
return row is not None
def get_document_by_hash(conn: sqlite3.Connection, content_hash: str) -> dict | None:
"""Return duplicate info for a given hash, or None.
Checks both the documents table (already ingested) and the jobs table
(queued/processing). Returns a dict with either ``document_id`` or
``job_id`` so callers can distinguish the two cases.
"""
row = conn.execute(
"SELECT id, title FROM documents WHERE content_hash = ?", (content_hash,)
).fetchone()
if row is not None:
return {"document_id": row["id"], "title": row["title"]}
# Also check pending/processing jobs that haven't been committed to documents yet
row = conn.execute(
"SELECT id, filename FROM jobs WHERE content_hash = ? AND status IN ('queued', 'processing')",
(content_hash,),
).fetchone()
if row is not None:
return {"job_id": row["id"], "title": row["filename"]}
return None
def insert_document(
conn: sqlite3.Connection,
title: str,
@@ -252,11 +280,12 @@ def create_job(
doc_type: Optional[str] = None,
tags_json: str = "[]",
title: Optional[str] = None,
content_hash: Optional[str] = None,
) -> int:
"""Create a new ingest job and return its id."""
cur = conn.execute(
"INSERT INTO jobs(filename, staging_path, doc_type, tags_json, title) VALUES (?, ?, ?, ?, ?)",
(filename, staging_path, doc_type, tags_json, title),
"INSERT INTO jobs(filename, staging_path, doc_type, tags_json, title, content_hash) VALUES (?, ?, ?, ?, ?, ?)",
(filename, staging_path, doc_type, tags_json, title, content_hash),
)
conn.commit()
return cur.lastrowid
+24 -8
View File
@@ -1,13 +1,15 @@
"""Job management endpoints — submit files/notes for ingestion and track progress."""
import hashlib
import json
from typing import Optional
from fastapi import HTTPException, UploadFile, File, Form, Query
from fastapi.responses import JSONResponse
from main import app
from kb.config import cfg
from kb.database import get_connection, create_job, get_job, list_jobs
from kb.database import get_connection, create_job, get_job, list_jobs, get_document_by_hash
from kb.staging import stage_file, stage_note
@@ -27,18 +29,32 @@ async def submit_job(
if file:
content = await file.read()
staging_path = stage_file(cfg.staging_dir, file.filename, content)
content_hash = hashlib.sha256(content).hexdigest()
filename = file.filename
else:
staging_path = stage_note(cfg.staging_dir, title or "note", note)
filename = staging_path.name
tags_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else []
tags_json = json.dumps(tags_list)
content = note.encode("utf-8")
content_hash = hashlib.sha256(content).hexdigest()
filename = None
conn = get_connection(cfg.db_path)
try:
job_id = create_job(conn, filename, str(staging_path), doc_type, tags_json, title)
existing = get_document_by_hash(conn, content_hash)
if existing:
return JSONResponse(
status_code=409,
content={"error": "duplicate", **existing},
)
if file:
staging_path = stage_file(cfg.staging_dir, file.filename, content)
else:
staging_path = stage_note(cfg.staging_dir, title or "note", note)
filename = staging_path.name
tags_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else []
tags_json = json.dumps(tags_list)
job_id = create_job(conn, filename, str(staging_path), doc_type, tags_json, title, content_hash)
return {"job_id": job_id, "status": "queued", "filename": filename}
finally:
conn.close()
+28 -2
View File
@@ -1,9 +1,12 @@
"""Hybrid search — FTS5 + sqlite-vec with Reciprocal Rank Fusion."""
import json
import logging
import struct
import sqlite3
logger = logging.getLogger("kb.search")
def hybrid_search(
conn: sqlite3.Connection,
@@ -74,6 +77,21 @@ def hybrid_search(
# Internal helpers
# ---------------------------------------------------------------------------
def _sanitize_fts_query(query: str) -> str:
"""Escape a raw user query for safe use with FTS5 MATCH.
Splits on whitespace, strips double quotes from each token, wraps each
token in double quotes (making FTS5 treat all content as literals), and
joins with spaces. Returns empty string if no valid tokens remain.
"""
tokens = []
for token in query.split():
token = token.replace('"', '')
if token:
tokens.append(f'"{token}"')
return " ".join(tokens)
def _fts_search(
conn: sqlite3.Connection,
query: str,
@@ -86,10 +104,14 @@ def _fts_search(
Returns:
{chunk_id: bm25_score} where scores are positive (higher = better).
"""
safe_query = _sanitize_fts_query(query)
if not safe_query:
return {}
sql = "SELECT f.rowid AS chunk_id, bm25(chunks_fts) AS rank FROM chunks_fts f"
joins: list[str] = []
where: list[str] = ["chunks_fts MATCH ?"]
params: list = [query]
params: list = [safe_query]
if tags or doc_type:
joins.append("JOIN chunks c ON f.rowid = c.id")
@@ -111,7 +133,11 @@ def _fts_search(
sql += " ORDER BY rank LIMIT ?"
params.append(limit)
rows = conn.execute(sql, params).fetchall()
try:
rows = conn.execute(sql, params).fetchall()
except sqlite3.OperationalError:
logger.warning("FTS5 query failed for input: %r", query)
return {}
# BM25 returns negative values (lower = better match); negate so
# higher = better.