b5a203d2aa
- Add bulk delete, bulk tags, and bulk set-tags engine endpoints (POST /api/v1/bulk/delete, /bulk/tags, /bulk/set-tags) - Filter-based selection: by tags, doc_type, ID list, ID range - Safety threshold (KB_BULK_SAFETY_PERCENT, default 70%) prevents accidental mass operations unless force=true - Synchronous execution with audit trail via jobs table - Add kb_bulk_delete, kb_bulk_tags, kb_bulk_set_tags MCP tools - Add kb bulk-remove, bulk-tag, bulk-set-tags CLI commands - Remove collection abstraction from MCP server (use tags instead) - Remove kb_set_collection MCP tool - Update SKILL.md, MCP.md, README.md documentation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
282 lines
8.8 KiB
Python
282 lines
8.8 KiB
Python
"""Bulk operation endpoints — delete, tag, and set-tags on multiple documents."""
|
|
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from fastapi import HTTPException
|
|
from pydantic import BaseModel, model_validator
|
|
|
|
from main import app
|
|
from kb.config import cfg
|
|
from kb.database import (
|
|
get_connection,
|
|
resolve_bulk_selection,
|
|
count_documents,
|
|
create_bulk_job,
|
|
tag_document,
|
|
untag_document,
|
|
)
|
|
|
|
logger = logging.getLogger("kb.routes.bulk")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Request models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class BulkSelectionRequest(BaseModel):
|
|
document_ids: Optional[list[int]] = None
|
|
tags: Optional[list[str]] = None
|
|
doc_type: Optional[str] = None
|
|
from_id: Optional[int] = None
|
|
to_id: Optional[int] = None
|
|
force: bool = False
|
|
|
|
@model_validator(mode="after")
|
|
def require_at_least_one_filter(self):
|
|
if not any([self.document_ids, self.tags, self.doc_type,
|
|
self.from_id is not None, self.to_id is not None]):
|
|
raise ValueError("At least one selection filter is required")
|
|
return self
|
|
|
|
|
|
class BulkDeleteRequest(BulkSelectionRequest):
|
|
pass
|
|
|
|
|
|
class BulkTagsRequest(BulkSelectionRequest):
|
|
add: Optional[list[str]] = None
|
|
remove: Optional[list[str]] = None
|
|
|
|
@model_validator(mode="after")
|
|
def require_add_or_remove(self):
|
|
if not self.add and not self.remove:
|
|
raise ValueError("At least one of 'add' or 'remove' is required")
|
|
return self
|
|
|
|
|
|
class BulkSetTagsRequest(BulkSelectionRequest):
|
|
new_tags: list[str]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _check_safety_threshold(matched: int, total: int, force: bool) -> None:
|
|
"""Raise 409 if the operation would affect too many documents."""
|
|
threshold = cfg.bulk_safety_percent
|
|
if threshold <= 0 or force or total == 0:
|
|
return
|
|
percent = (matched / total) * 100
|
|
if percent > threshold:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail={
|
|
"error": "safety_threshold_exceeded",
|
|
"message": (
|
|
f"Operation would affect {matched} of {total} documents "
|
|
f"({percent:.1f}%). Exceeds safety threshold of {threshold}%. "
|
|
f"Use force: true to proceed."
|
|
),
|
|
"matched": matched,
|
|
"total": total,
|
|
"percent": round(percent, 1),
|
|
"threshold": threshold,
|
|
},
|
|
)
|
|
|
|
|
|
def _filters_dict(req: BulkSelectionRequest) -> str:
|
|
"""Build a JSON string of the selection filter for audit logging."""
|
|
d = {}
|
|
if req.document_ids:
|
|
d["document_ids"] = req.document_ids
|
|
if req.tags:
|
|
d["tags"] = req.tags
|
|
if req.doc_type:
|
|
d["doc_type"] = req.doc_type
|
|
if req.from_id is not None:
|
|
d["from_id"] = req.from_id
|
|
if req.to_id is not None:
|
|
d["to_id"] = req.to_id
|
|
return json.dumps(d)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.post("/api/v1/bulk/delete")
|
|
async def bulk_delete(req: BulkDeleteRequest):
|
|
conn = get_connection(cfg.db_path)
|
|
try:
|
|
doc_ids = resolve_bulk_selection(
|
|
conn, req.document_ids, req.tags, req.doc_type, req.from_id, req.to_id,
|
|
)
|
|
total = count_documents(conn)
|
|
_check_safety_threshold(len(doc_ids), total, req.force)
|
|
|
|
succeeded = 0
|
|
failed = 0
|
|
errors = []
|
|
stored_files: list[str] = []
|
|
|
|
for doc_id in doc_ids:
|
|
try:
|
|
doc = conn.execute(
|
|
"SELECT id, stored_path FROM documents WHERE id = ?", (doc_id,)
|
|
).fetchone()
|
|
if not doc:
|
|
failed += 1
|
|
errors.append({"document_id": doc_id, "error": "not found"})
|
|
continue
|
|
|
|
if doc["stored_path"]:
|
|
stored_files.append(doc["stored_path"])
|
|
|
|
# Delete embeddings
|
|
chunk_ids = conn.execute(
|
|
"SELECT id FROM chunks WHERE document_id = ?", (doc_id,)
|
|
).fetchall()
|
|
for row in chunk_ids:
|
|
conn.execute("DELETE FROM chunks_vec WHERE chunk_id = ?", (row["id"],))
|
|
|
|
# Delete document (cascades to chunks, document_tags)
|
|
conn.execute("DELETE FROM documents WHERE id = ?", (doc_id,))
|
|
succeeded += 1
|
|
except Exception as exc:
|
|
failed += 1
|
|
errors.append({"document_id": doc_id, "error": str(exc)})
|
|
|
|
conn.commit()
|
|
|
|
# Best-effort file cleanup after commit
|
|
for path in stored_files:
|
|
try:
|
|
f = Path(path)
|
|
if f.exists():
|
|
f.unlink()
|
|
except OSError as exc:
|
|
logger.warning("Failed to delete stored file %s: %s", path, exc)
|
|
|
|
errors_json = json.dumps(errors) if errors else "[]"
|
|
job_id = create_bulk_job(
|
|
conn, "bulk_delete", _filters_dict(req),
|
|
len(doc_ids), succeeded, failed, errors_json,
|
|
)
|
|
|
|
return {
|
|
"job_id": job_id,
|
|
"status": "done" if failed == 0 else "partial_failure",
|
|
"matched": len(doc_ids),
|
|
"succeeded": succeeded,
|
|
"failed": failed,
|
|
"errors": errors,
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@app.post("/api/v1/bulk/tags")
|
|
async def bulk_tags(req: BulkTagsRequest):
|
|
conn = get_connection(cfg.db_path)
|
|
try:
|
|
doc_ids = resolve_bulk_selection(
|
|
conn, req.document_ids, req.tags, req.doc_type, req.from_id, req.to_id,
|
|
)
|
|
total = count_documents(conn)
|
|
_check_safety_threshold(len(doc_ids), total, req.force)
|
|
|
|
succeeded = 0
|
|
failed = 0
|
|
errors = []
|
|
|
|
for doc_id in doc_ids:
|
|
try:
|
|
if req.add:
|
|
tag_document(conn, doc_id, req.add)
|
|
if req.remove:
|
|
untag_document(conn, doc_id, req.remove)
|
|
conn.execute(
|
|
"UPDATE documents SET updated_at = current_timestamp WHERE id = ?",
|
|
(doc_id,),
|
|
)
|
|
succeeded += 1
|
|
except Exception as exc:
|
|
failed += 1
|
|
errors.append({"document_id": doc_id, "error": str(exc)})
|
|
|
|
conn.commit()
|
|
|
|
errors_json = json.dumps(errors) if errors else "[]"
|
|
job_id = create_bulk_job(
|
|
conn, "bulk_tags", _filters_dict(req),
|
|
len(doc_ids), succeeded, failed, errors_json,
|
|
)
|
|
|
|
return {
|
|
"job_id": job_id,
|
|
"status": "done" if failed == 0 else "partial_failure",
|
|
"matched": len(doc_ids),
|
|
"succeeded": succeeded,
|
|
"failed": failed,
|
|
"errors": errors,
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@app.post("/api/v1/bulk/set-tags")
|
|
async def bulk_set_tags(req: BulkSetTagsRequest):
|
|
conn = get_connection(cfg.db_path)
|
|
try:
|
|
doc_ids = resolve_bulk_selection(
|
|
conn, req.document_ids, req.tags, req.doc_type, req.from_id, req.to_id,
|
|
)
|
|
total = count_documents(conn)
|
|
_check_safety_threshold(len(doc_ids), total, req.force)
|
|
|
|
succeeded = 0
|
|
failed = 0
|
|
errors = []
|
|
|
|
for doc_id in doc_ids:
|
|
try:
|
|
# Remove all existing tags
|
|
conn.execute(
|
|
"DELETE FROM document_tags WHERE document_id = ?", (doc_id,)
|
|
)
|
|
# Apply new tag set
|
|
if req.new_tags:
|
|
tag_document(conn, doc_id, req.new_tags)
|
|
conn.execute(
|
|
"UPDATE documents SET updated_at = current_timestamp WHERE id = ?",
|
|
(doc_id,),
|
|
)
|
|
succeeded += 1
|
|
except Exception as exc:
|
|
failed += 1
|
|
errors.append({"document_id": doc_id, "error": str(exc)})
|
|
|
|
conn.commit()
|
|
|
|
errors_json = json.dumps(errors) if errors else "[]"
|
|
job_id = create_bulk_job(
|
|
conn, "bulk_set_tags", _filters_dict(req),
|
|
len(doc_ids), succeeded, failed, errors_json,
|
|
)
|
|
|
|
return {
|
|
"job_id": job_id,
|
|
"status": "done" if failed == 0 else "partial_failure",
|
|
"matched": len(doc_ids),
|
|
"succeeded": succeeded,
|
|
"failed": failed,
|
|
"errors": errors,
|
|
}
|
|
finally:
|
|
conn.close()
|