Files
kb/mcp/server.py
T
steve b5a203d2aa Add bulk operations and remove collections abstraction
- Add bulk delete, bulk tags, and bulk set-tags engine endpoints
  (POST /api/v1/bulk/delete, /bulk/tags, /bulk/set-tags)
- Filter-based selection: by tags, doc_type, ID list, ID range
- Safety threshold (KB_BULK_SAFETY_PERCENT, default 70%) prevents
  accidental mass operations unless force=true
- Synchronous execution with audit trail via jobs table
- Add kb_bulk_delete, kb_bulk_tags, kb_bulk_set_tags MCP tools
- Add kb bulk-remove, bulk-tag, bulk-set-tags CLI commands
- Remove collection abstraction from MCP server (use tags instead)
- Remove kb_set_collection MCP tool
- Update SKILL.md, MCP.md, README.md documentation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:34:47 +01:00

447 lines
15 KiB
Python

"""kb MCP server — exposes knowledge base operations as MCP tools."""
import asyncio
import json
import logging
from mcp.server.fastmcp import FastMCP
from mcp.server.transport_security import TransportSecuritySettings
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Mount
import config
import engine
import uploads
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("kb.mcp")
# ---------------------------------------------------------------------------
# Transport security — DNS rebinding protection with configurable allowed hosts
# ---------------------------------------------------------------------------
_LOCALHOST_HOSTS = ["127.0.0.1:*", "localhost:*", "[::1]:*"]
_LOCALHOST_ORIGINS = ["http://127.0.0.1:*", "http://localhost:*", "http://[::1]:*"]
_extra_hosts = config.parse_allowed_hosts()
_allowed_hosts = _LOCALHOST_HOSTS + [f"{h}:*" for h in _extra_hosts]
_allowed_origins = _LOCALHOST_ORIGINS + [f"http://{h}:*" for h in _extra_hosts]
_transport_security = TransportSecuritySettings(
enable_dns_rebinding_protection=True,
allowed_hosts=_allowed_hosts,
allowed_origins=_allowed_origins,
)
# ---------------------------------------------------------------------------
# FastMCP server
# ---------------------------------------------------------------------------
mcp = FastMCP(
"kb",
instructions=(
"Knowledge base MCP server. Provides tools for searching, adding, and "
"managing documents and notes. Use tags to organise and filter documents "
"(e.g. tag notes with 'agent:mybot' and filter searches by that tag). "
"This server requires Bearer token authentication — all requests are "
"authenticated via the Authorization header at the HTTP transport layer."
),
transport_security=_transport_security,
)
@mcp.tool()
async def kb_search(
query: str,
top: int = 10,
tags: list[str] | None = None,
doc_type: str | None = None,
fts_only: bool = False,
) -> str:
"""Search the knowledge base for relevant documents and notes.
Returns ranked chunks matching the query, with text content, relevance scores,
and document metadata.
Args:
query: The search query. Can be a natural language question or keywords.
top: Maximum number of results to return (default 10).
tags: Filter results to documents with ALL of these tags.
doc_type: Filter by document type (e.g. "note", "pdf", "markdown", "code").
fts_only: If true, use only full-text search (no vector similarity).
Tips for complex queries:
- Consider expanding into 2-3 variant phrasings and calling this tool multiple
times, then deduplicating results by chunk_id. For example, search for both
"pension revaluation rules" and "how are pensions revalued" to cast a wider net.
- For precision, rerank the returned results using your own judgement based on
relevance to the original question.
"""
result = engine.search(
query=query,
top=top,
tags=tags or None,
doc_type=doc_type,
fts_only=fts_only,
)
results_list = result if isinstance(result, list) else result.get("results", [])
return json.dumps(results_list, indent=2)
@mcp.tool()
async def kb_addnote(
text: str,
tags: list[str] | None = None,
title: str | None = None,
) -> str:
"""Add a text note to the knowledge base for indexing and search.
The note is queued for ingestion — it will be chunked, embedded, and made
searchable. Use kb_jobs to check ingestion status.
Args:
text: The note text content.
tags: Tags to apply to the note.
title: Optional title (auto-derived from first line if omitted).
"""
result = engine.add_note(text=text, tags=tags or None, title=title)
return json.dumps(result, indent=2)
@mcp.tool()
async def kb_update_note(
document_id: int,
text: str,
) -> str:
"""Update an existing note's content in place.
Replaces the note text, re-chunks, and re-embeds while preserving the
document ID, creation timestamp, and tags. Only works on documents with
doc_type "note".
Args:
document_id: The ID of the note document to update.
text: The new text content for the note.
"""
result = engine.update_note(document_id, text)
return json.dumps(result, indent=2)
@mcp.tool()
async def kb_get(
document_id: int | None = None,
source_path: str | None = None,
) -> str:
"""Retrieve document details from the knowledge base.
Look up a document by its ID or source path. Returns full document metadata,
tags, and chunk contents.
Args:
document_id: The numeric document ID.
source_path: The document's source path (alternative to document_id).
"""
if document_id is not None:
result = engine.get_document(document_id)
return json.dumps(result, indent=2)
elif source_path is not None:
docs = engine.list_documents()
matches = [d for d in docs if d.get("source_path") == source_path]
if not matches:
return json.dumps({"error": "No document found with that source_path"})
doc = engine.get_document(matches[0]["id"])
return json.dumps(doc, indent=2)
else:
return json.dumps({"error": "Provide either document_id or source_path"})
@mcp.tool()
async def kb_status() -> str:
"""Get knowledge base engine status.
Returns engine version, embedding model info, device info, document counts,
database size, and ingestion queue state.
"""
result = engine.get_status()
result["authenticated"] = bool(config.KB_MCP_API_KEY)
return json.dumps(result, indent=2)
@mcp.tool()
async def kb_jobs(
status: str | None = None,
) -> str:
"""List ingestion jobs and their status.
Returns recent jobs showing what has been queued, is processing, completed,
or failed.
Args:
status: Filter by job status ("queued", "processing", "done", "failed", "skipped").
"""
result = engine.list_jobs(status=status)
return json.dumps(result, indent=2)
@mcp.tool()
async def kb_delete(
document_id: int,
) -> str:
"""Permanently delete a document from the knowledge base.
Removes the document and all associated data (chunks, embeddings, tags,
stored files). This action cannot be undone.
Args:
document_id: The ID of the document to delete.
"""
result = engine.delete_document(document_id)
return json.dumps(result, indent=2)
@mcp.tool()
async def kb_upload_start(
filename: str,
total_size: int,
tags: list[str] | None = None,
) -> str:
"""Start a chunked file upload to the knowledge base.
Use this for uploading files from a remote agent. The upload process is:
1. Call kb_upload_start to get an upload_id
2. Call kb_upload_chunk repeatedly with base64-encoded file chunks (recommended ~1MB each)
3. Call kb_upload_finish to submit the file for ingestion
Example for a 3MB file:
upload = kb_upload_start(filename="report.pdf", total_size=3145728, tags=["project:x"])
kb_upload_chunk(upload_id=upload["upload_id"], data="<base64 chunk 0>", chunk_index=0)
kb_upload_chunk(upload_id=upload["upload_id"], data="<base64 chunk 1>", chunk_index=1)
kb_upload_chunk(upload_id=upload["upload_id"], data="<base64 chunk 2>", chunk_index=2)
result = kb_upload_finish(upload_id=upload["upload_id"])
Args:
filename: Original filename (used for type detection).
total_size: Total file size in bytes.
tags: Tags to apply to the uploaded document.
"""
upload_id = uploads.start_upload(filename, total_size, tags or [])
return json.dumps({"upload_id": upload_id})
@mcp.tool()
async def kb_upload_chunk(
upload_id: str,
data: str,
chunk_index: int,
) -> str:
"""Upload a base64-encoded chunk of a file.
Part of the chunked upload flow started by kb_upload_start.
Args:
upload_id: The upload ID from kb_upload_start.
data: Base64-encoded file data for this chunk.
chunk_index: Zero-based index of this chunk.
"""
try:
uploads.add_chunk(upload_id, data, chunk_index)
return json.dumps({"status": "ok", "chunk_index": chunk_index})
except KeyError as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def kb_upload_finish(
upload_id: str,
) -> str:
"""Finish a chunked upload and submit the file for ingestion.
Reassembles all uploaded chunks and forwards the complete file to the
engine for processing. Returns the ingestion job ID.
Args:
upload_id: The upload ID from kb_upload_start.
"""
try:
filename, file_bytes, tags = uploads.finish_upload(upload_id)
result = engine.upload_file(filename, file_bytes, tags)
return json.dumps(result, indent=2)
except KeyError as e:
return json.dumps({"error": str(e)})
# ---------------------------------------------------------------------------
# Bulk operation tools
# ---------------------------------------------------------------------------
@mcp.tool()
async def kb_bulk_delete(
document_ids: list[int] | None = None,
tags: list[str] | None = None,
doc_type: str | None = None,
from_id: int | None = None,
to_id: int | None = None,
force: bool = False,
) -> str:
"""Permanently delete multiple documents matching a filter.
Removes matched documents and all associated data (chunks, embeddings, tags,
stored files). This action cannot be undone.
Selection filters combine with AND logic — at least one is required.
A safety threshold applies: if the operation would affect more than 70% of
all documents, it is rejected unless force=true.
Args:
document_ids: Delete documents with these specific IDs.
tags: Delete documents that have ALL of these tags (selection filter).
doc_type: Delete documents of this type (e.g. "note", "pdf").
from_id: Delete documents with id >= this value.
to_id: Delete documents with id <= this value.
force: Override the safety threshold if it would block the operation.
"""
result = engine.bulk_delete(
document_ids=document_ids, tags=tags, doc_type=doc_type,
from_id=from_id, to_id=to_id, force=force,
)
return json.dumps(result, indent=2)
@mcp.tool()
async def kb_bulk_tags(
document_ids: list[int] | None = None,
tags: list[str] | None = None,
doc_type: str | None = None,
from_id: int | None = None,
to_id: int | None = None,
add: list[str] | None = None,
remove: list[str] | None = None,
force: bool = False,
) -> str:
"""Add and/or remove tags on multiple documents matching a filter.
Selection filters combine with AND logic — at least one is required.
Note: the 'tags' parameter is a SELECTION FILTER (which documents to target),
while 'add' and 'remove' specify the TAG CHANGES to apply to those documents.
Args:
document_ids: Target documents with these specific IDs.
tags: Target documents that have ALL of these tags (selection filter).
doc_type: Target documents of this type.
from_id: Target documents with id >= this value.
to_id: Target documents with id <= this value.
add: Tags to add to matched documents.
remove: Tags to remove from matched documents.
force: Override the safety threshold if it would block the operation.
"""
result = engine.bulk_tags(
document_ids=document_ids, tags=tags, doc_type=doc_type,
from_id=from_id, to_id=to_id, add=add, remove=remove, force=force,
)
return json.dumps(result, indent=2)
@mcp.tool()
async def kb_bulk_set_tags(
document_ids: list[int] | None = None,
tags: list[str] | None = None,
doc_type: str | None = None,
from_id: int | None = None,
to_id: int | None = None,
new_tags: list[str] | None = None,
force: bool = False,
) -> str:
"""Replace all tags on multiple documents with a new set.
Removes ALL existing tags from matched documents, then applies the new tag set.
Selection filters combine with AND logic — at least one is required.
Note: the 'tags' parameter is a SELECTION FILTER (which documents to target),
while 'new_tags' is the REPLACEMENT tag set to apply.
Args:
document_ids: Target documents with these specific IDs.
tags: Target documents that have ALL of these tags (selection filter).
doc_type: Target documents of this type.
from_id: Target documents with id >= this value.
to_id: Target documents with id <= this value.
new_tags: The replacement tag set to apply to all matched documents.
force: Override the safety threshold if it would block the operation.
"""
result = engine.bulk_set_tags(
document_ids=document_ids, tags=tags, doc_type=doc_type,
from_id=from_id, to_id=to_id, new_tags=new_tags, force=force,
)
return json.dumps(result, indent=2)
# ---------------------------------------------------------------------------
# Auth middleware
# ---------------------------------------------------------------------------
class BearerAuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
if not config.KB_MCP_API_KEY:
return await call_next(request)
auth_header = request.headers.get("authorization", "")
if auth_header.startswith("Bearer ") and auth_header[7:] == config.KB_MCP_API_KEY:
return await call_next(request)
return JSONResponse(
status_code=401,
content={"error": "Unauthorized"},
)
# ---------------------------------------------------------------------------
# ASGI app assembly
# ---------------------------------------------------------------------------
def create_app():
"""Create the ASGI app with auth middleware wrapping the MCP server."""
from contextlib import asynccontextmanager
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def lifespan(app):
uploads.start_cleanup_task()
logger.info("Upload cleanup task started")
# Delegate to the MCP app's lifespan if it has one
if hasattr(mcp_app, 'router') and hasattr(mcp_app.router, 'lifespan_context'):
async with mcp_app.router.lifespan_context(app):
yield
else:
yield
app = Starlette(
routes=[Mount("/", app=mcp_app)],
middleware=[Middleware(BearerAuthMiddleware)],
lifespan=lifespan,
)
return app
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
logger.info(
"Starting kb MCP server on port %d, engine=%s",
config.KB_MCP_PORT,
config.KB_ENGINE_URL,
)
app = create_app()
uvicorn.run(app, host="0.0.0.0", port=config.KB_MCP_PORT)