Files
kb/engine/kb/routes/bulk.py
T
steve b5a203d2aa Add bulk operations and remove collections abstraction
- Add bulk delete, bulk tags, and bulk set-tags engine endpoints
  (POST /api/v1/bulk/delete, /bulk/tags, /bulk/set-tags)
- Filter-based selection: by tags, doc_type, ID list, ID range
- Safety threshold (KB_BULK_SAFETY_PERCENT, default 70%) prevents
  accidental mass operations unless force=true
- Synchronous execution with audit trail via jobs table
- Add kb_bulk_delete, kb_bulk_tags, kb_bulk_set_tags MCP tools
- Add kb bulk-remove, bulk-tag, bulk-set-tags CLI commands
- Remove collection abstraction from MCP server (use tags instead)
- Remove kb_set_collection MCP tool
- Update SKILL.md, MCP.md, README.md documentation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:34:47 +01:00

282 lines
8.8 KiB
Python

"""Bulk operation endpoints — delete, tag, and set-tags on multiple documents."""
import json
import logging
from pathlib import Path
from typing import Optional
from fastapi import HTTPException
from pydantic import BaseModel, model_validator
from main import app
from kb.config import cfg
from kb.database import (
get_connection,
resolve_bulk_selection,
count_documents,
create_bulk_job,
tag_document,
untag_document,
)
logger = logging.getLogger("kb.routes.bulk")
# ---------------------------------------------------------------------------
# Request models
# ---------------------------------------------------------------------------
class BulkSelectionRequest(BaseModel):
document_ids: Optional[list[int]] = None
tags: Optional[list[str]] = None
doc_type: Optional[str] = None
from_id: Optional[int] = None
to_id: Optional[int] = None
force: bool = False
@model_validator(mode="after")
def require_at_least_one_filter(self):
if not any([self.document_ids, self.tags, self.doc_type,
self.from_id is not None, self.to_id is not None]):
raise ValueError("At least one selection filter is required")
return self
class BulkDeleteRequest(BulkSelectionRequest):
pass
class BulkTagsRequest(BulkSelectionRequest):
add: Optional[list[str]] = None
remove: Optional[list[str]] = None
@model_validator(mode="after")
def require_add_or_remove(self):
if not self.add and not self.remove:
raise ValueError("At least one of 'add' or 'remove' is required")
return self
class BulkSetTagsRequest(BulkSelectionRequest):
new_tags: list[str]
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _check_safety_threshold(matched: int, total: int, force: bool) -> None:
"""Raise 409 if the operation would affect too many documents."""
threshold = cfg.bulk_safety_percent
if threshold <= 0 or force or total == 0:
return
percent = (matched / total) * 100
if percent > threshold:
raise HTTPException(
status_code=409,
detail={
"error": "safety_threshold_exceeded",
"message": (
f"Operation would affect {matched} of {total} documents "
f"({percent:.1f}%). Exceeds safety threshold of {threshold}%. "
f"Use force: true to proceed."
),
"matched": matched,
"total": total,
"percent": round(percent, 1),
"threshold": threshold,
},
)
def _filters_dict(req: BulkSelectionRequest) -> str:
"""Build a JSON string of the selection filter for audit logging."""
d = {}
if req.document_ids:
d["document_ids"] = req.document_ids
if req.tags:
d["tags"] = req.tags
if req.doc_type:
d["doc_type"] = req.doc_type
if req.from_id is not None:
d["from_id"] = req.from_id
if req.to_id is not None:
d["to_id"] = req.to_id
return json.dumps(d)
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.post("/api/v1/bulk/delete")
async def bulk_delete(req: BulkDeleteRequest):
conn = get_connection(cfg.db_path)
try:
doc_ids = resolve_bulk_selection(
conn, req.document_ids, req.tags, req.doc_type, req.from_id, req.to_id,
)
total = count_documents(conn)
_check_safety_threshold(len(doc_ids), total, req.force)
succeeded = 0
failed = 0
errors = []
stored_files: list[str] = []
for doc_id in doc_ids:
try:
doc = conn.execute(
"SELECT id, stored_path FROM documents WHERE id = ?", (doc_id,)
).fetchone()
if not doc:
failed += 1
errors.append({"document_id": doc_id, "error": "not found"})
continue
if doc["stored_path"]:
stored_files.append(doc["stored_path"])
# Delete embeddings
chunk_ids = conn.execute(
"SELECT id FROM chunks WHERE document_id = ?", (doc_id,)
).fetchall()
for row in chunk_ids:
conn.execute("DELETE FROM chunks_vec WHERE chunk_id = ?", (row["id"],))
# Delete document (cascades to chunks, document_tags)
conn.execute("DELETE FROM documents WHERE id = ?", (doc_id,))
succeeded += 1
except Exception as exc:
failed += 1
errors.append({"document_id": doc_id, "error": str(exc)})
conn.commit()
# Best-effort file cleanup after commit
for path in stored_files:
try:
f = Path(path)
if f.exists():
f.unlink()
except OSError as exc:
logger.warning("Failed to delete stored file %s: %s", path, exc)
errors_json = json.dumps(errors) if errors else "[]"
job_id = create_bulk_job(
conn, "bulk_delete", _filters_dict(req),
len(doc_ids), succeeded, failed, errors_json,
)
return {
"job_id": job_id,
"status": "done" if failed == 0 else "partial_failure",
"matched": len(doc_ids),
"succeeded": succeeded,
"failed": failed,
"errors": errors,
}
finally:
conn.close()
@app.post("/api/v1/bulk/tags")
async def bulk_tags(req: BulkTagsRequest):
conn = get_connection(cfg.db_path)
try:
doc_ids = resolve_bulk_selection(
conn, req.document_ids, req.tags, req.doc_type, req.from_id, req.to_id,
)
total = count_documents(conn)
_check_safety_threshold(len(doc_ids), total, req.force)
succeeded = 0
failed = 0
errors = []
for doc_id in doc_ids:
try:
if req.add:
tag_document(conn, doc_id, req.add)
if req.remove:
untag_document(conn, doc_id, req.remove)
conn.execute(
"UPDATE documents SET updated_at = current_timestamp WHERE id = ?",
(doc_id,),
)
succeeded += 1
except Exception as exc:
failed += 1
errors.append({"document_id": doc_id, "error": str(exc)})
conn.commit()
errors_json = json.dumps(errors) if errors else "[]"
job_id = create_bulk_job(
conn, "bulk_tags", _filters_dict(req),
len(doc_ids), succeeded, failed, errors_json,
)
return {
"job_id": job_id,
"status": "done" if failed == 0 else "partial_failure",
"matched": len(doc_ids),
"succeeded": succeeded,
"failed": failed,
"errors": errors,
}
finally:
conn.close()
@app.post("/api/v1/bulk/set-tags")
async def bulk_set_tags(req: BulkSetTagsRequest):
conn = get_connection(cfg.db_path)
try:
doc_ids = resolve_bulk_selection(
conn, req.document_ids, req.tags, req.doc_type, req.from_id, req.to_id,
)
total = count_documents(conn)
_check_safety_threshold(len(doc_ids), total, req.force)
succeeded = 0
failed = 0
errors = []
for doc_id in doc_ids:
try:
# Remove all existing tags
conn.execute(
"DELETE FROM document_tags WHERE document_id = ?", (doc_id,)
)
# Apply new tag set
if req.new_tags:
tag_document(conn, doc_id, req.new_tags)
conn.execute(
"UPDATE documents SET updated_at = current_timestamp WHERE id = ?",
(doc_id,),
)
succeeded += 1
except Exception as exc:
failed += 1
errors.append({"document_id": doc_id, "error": str(exc)})
conn.commit()
errors_json = json.dumps(errors) if errors else "[]"
job_id = create_bulk_job(
conn, "bulk_set_tags", _filters_dict(req),
len(doc_ids), succeeded, failed, errors_json,
)
return {
"job_id": job_id,
"status": "done" if failed == 0 else "partial_failure",
"matched": len(doc_ids),
"succeeded": succeeded,
"failed": failed,
"errors": errors,
}
finally:
conn.close()