Files
steve b5a203d2aa Add bulk operations and remove collections abstraction
- Add bulk delete, bulk tags, and bulk set-tags engine endpoints
  (POST /api/v1/bulk/delete, /bulk/tags, /bulk/set-tags)
- Filter-based selection: by tags, doc_type, ID list, ID range
- Safety threshold (KB_BULK_SAFETY_PERCENT, default 70%) prevents
  accidental mass operations unless force=true
- Synchronous execution with audit trail via jobs table
- Add kb_bulk_delete, kb_bulk_tags, kb_bulk_set_tags MCP tools
- Add kb bulk-remove, bulk-tag, bulk-set-tags CLI commands
- Remove collection abstraction from MCP server (use tags instead)
- Remove kb_set_collection MCP tool
- Update SKILL.md, MCP.md, README.md documentation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:34:47 +01:00

209 lines
5.5 KiB
Python

"""HTTP client for the kb engine API."""
import httpx
from config import KB_ENGINE_URL, KB_API_KEY
def _auth_headers() -> dict[str, str]:
h: dict[str, str] = {}
if KB_API_KEY:
h["Authorization"] = f"Bearer {KB_API_KEY}"
return h
def _client() -> httpx.Client:
return httpx.Client(base_url=KB_ENGINE_URL, headers=_auth_headers(), timeout=60.0)
def search(query: str, top: int = 10, tags: list[str] | None = None,
doc_type: str | None = None, fts_only: bool = False,
vec_only: bool = False, threshold: float | None = None) -> dict:
body: dict = {"query": query, "top": top}
if tags:
body["tags"] = tags
if doc_type:
body["doc_type"] = doc_type
if fts_only:
body["fts_only"] = True
if vec_only:
body["vec_only"] = True
if threshold is not None:
body["threshold"] = threshold
with _client() as c:
r = c.post("/api/v1/search", json=body)
r.raise_for_status()
return r.json()
def add_note(text: str, tags: list[str] | None = None,
title: str | None = None) -> dict:
fields = {"note": text}
if tags:
fields["tags"] = ",".join(tags)
if title:
fields["title"] = title
with _client() as c:
r = c.post("/api/v1/jobs", data=fields)
r.raise_for_status()
return r.json()
def update_note(doc_id: int, text: str) -> dict:
with _client() as c:
r = c.patch(f"/api/v1/notes/{doc_id}", json={"text": text})
r.raise_for_status()
return r.json()
def get_document(doc_id: int) -> dict:
with _client() as c:
r = c.get(f"/api/v1/documents/{doc_id}")
r.raise_for_status()
return r.json()
def list_documents(doc_type: str | None = None,
tags: str | None = None) -> list[dict]:
params: dict = {}
if doc_type:
params["type"] = doc_type
if tags:
params["tags"] = tags
with _client() as c:
r = c.get("/api/v1/documents", params=params)
r.raise_for_status()
return r.json()
def get_status() -> dict:
with _client() as c:
r = c.get("/api/v1/status")
r.raise_for_status()
return r.json()
def list_jobs(status: str | None = None) -> list[dict]:
params: dict = {}
if status:
params["status"] = status
with _client() as c:
r = c.get("/api/v1/jobs", params=params)
r.raise_for_status()
return r.json()
def update_tags(doc_id: int, add: list[str] | None = None,
remove: list[str] | None = None) -> dict:
body: dict = {}
if add:
body["add"] = add
if remove:
body["remove"] = remove
with _client() as c:
r = c.put(f"/api/v1/documents/{doc_id}/tags", json=body)
r.raise_for_status()
return r.json()
def delete_document(doc_id: int) -> dict:
with _client() as c:
r = c.delete(f"/api/v1/documents/{doc_id}")
r.raise_for_status()
return r.json()
def _bulk_body(
document_ids: list[int] | None = None,
tags: list[str] | None = None,
doc_type: str | None = None,
from_id: int | None = None,
to_id: int | None = None,
force: bool = False,
**extra,
) -> dict:
body: dict = {}
if document_ids:
body["document_ids"] = document_ids
if tags:
body["tags"] = tags
if doc_type:
body["doc_type"] = doc_type
if from_id is not None:
body["from_id"] = from_id
if to_id is not None:
body["to_id"] = to_id
if force:
body["force"] = True
body.update(extra)
return body
def bulk_delete(
document_ids: list[int] | None = None,
tags: list[str] | None = None,
doc_type: str | None = None,
from_id: int | None = None,
to_id: int | None = None,
force: bool = False,
) -> dict:
body = _bulk_body(document_ids, tags, doc_type, from_id, to_id, force)
with _client() as c:
r = c.post("/api/v1/bulk/delete", json=body)
r.raise_for_status()
return r.json()
def bulk_tags(
document_ids: list[int] | None = None,
tags: list[str] | None = None,
doc_type: str | None = None,
from_id: int | None = None,
to_id: int | None = None,
add: list[str] | None = None,
remove: list[str] | None = None,
force: bool = False,
) -> dict:
extra = {}
if add:
extra["add"] = add
if remove:
extra["remove"] = remove
body = _bulk_body(document_ids, tags, doc_type, from_id, to_id, force, **extra)
with _client() as c:
r = c.post("/api/v1/bulk/tags", json=body)
r.raise_for_status()
return r.json()
def bulk_set_tags(
document_ids: list[int] | None = None,
tags: list[str] | None = None,
doc_type: str | None = None,
from_id: int | None = None,
to_id: int | None = None,
new_tags: list[str] | None = None,
force: bool = False,
) -> dict:
extra = {"new_tags": new_tags or []}
body = _bulk_body(document_ids, tags, doc_type, from_id, to_id, force, **extra)
with _client() as c:
r = c.post("/api/v1/bulk/set-tags", json=body)
r.raise_for_status()
return r.json()
def upload_file(filename: str, file_bytes: bytes,
tags: list[str] | None = None) -> dict:
fields: dict = {}
if tags:
fields["tags"] = ",".join(tags)
with _client() as c:
r = c.post(
"/api/v1/jobs",
data=fields,
files={"file": (filename, file_bytes)},
)
r.raise_for_status()
return r.json()