e6e91f1d5c
Agents were misreading kb_search as keyword-only because the vector/semantic
component was only mentioned in the negative ("fts_only: no vector similarity").
Lead with hybrid semantic + BM25 + RRF in the server instructions, kb_search
docstring, and MCP.md so agents recognise it as a vector search tool.
461 lines
16 KiB
Python
461 lines
16 KiB
Python
"""kb MCP server — exposes knowledge base operations as MCP tools."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from mcp.server.transport_security import TransportSecuritySettings
|
|
from starlette.applications import Starlette
|
|
from starlette.middleware import Middleware
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.requests import Request
|
|
from starlette.responses import JSONResponse
|
|
from starlette.routing import Mount
|
|
|
|
import config
|
|
import engine
|
|
import uploads
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger("kb.mcp")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Transport security — DNS rebinding protection with configurable allowed hosts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_LOCALHOST_HOSTS = ["127.0.0.1:*", "localhost:*", "[::1]:*"]
|
|
_LOCALHOST_ORIGINS = ["http://127.0.0.1:*", "http://localhost:*", "http://[::1]:*"]
|
|
|
|
_extra_hosts = config.parse_allowed_hosts()
|
|
_allowed_hosts = _LOCALHOST_HOSTS + [f"{h}:*" for h in _extra_hosts]
|
|
_allowed_origins = _LOCALHOST_ORIGINS + [f"http://{h}:*" for h in _extra_hosts]
|
|
|
|
_transport_security = TransportSecuritySettings(
|
|
enable_dns_rebinding_protection=True,
|
|
allowed_hosts=_allowed_hosts,
|
|
allowed_origins=_allowed_origins,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FastMCP server
|
|
# ---------------------------------------------------------------------------
|
|
|
|
mcp = FastMCP(
|
|
"kb",
|
|
instructions=(
|
|
"Knowledge base MCP server with hybrid semantic + full-text search. "
|
|
"kb_search uses dense vector embeddings (semantic similarity) fused with "
|
|
"BM25 full-text ranking, so it finds conceptually related content even "
|
|
"when the exact words don't match — agents can ask natural-language "
|
|
"questions rather than guessing keywords. Also provides tools for adding "
|
|
"notes, uploading files, and managing documents and tags. Use tags to "
|
|
"organise and filter documents (e.g. tag notes with 'agent:mybot' and "
|
|
"filter searches by that tag). This server requires Bearer token "
|
|
"authentication — all requests are authenticated via the Authorization "
|
|
"header at the HTTP transport layer."
|
|
),
|
|
transport_security=_transport_security,
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_search(
|
|
query: str,
|
|
top: int = 10,
|
|
tags: list[str] | None = None,
|
|
doc_type: str | None = None,
|
|
fts_only: bool = False,
|
|
) -> str:
|
|
"""Hybrid semantic (vector) + full-text search over the knowledge base.
|
|
|
|
Combines dense vector embeddings (semantic similarity — finds conceptually
|
|
related content even when the wording differs) with BM25 keyword ranking,
|
|
fused via reciprocal rank fusion. Because the search is semantic, you can
|
|
ask natural-language questions ("what did we decide about X?") rather than
|
|
guessing the exact keywords used in the source documents.
|
|
|
|
Returns ranked chunks matching the query, with text content, relevance
|
|
scores, and document metadata.
|
|
|
|
Args:
|
|
query: The search query — a natural language question or keywords.
|
|
top: Maximum number of results to return (default 10).
|
|
tags: Filter results to documents with ALL of these tags.
|
|
doc_type: Filter by document type (e.g. "note", "pdf", "markdown", "code").
|
|
fts_only: Disable the vector/semantic component and use only BM25
|
|
keyword matching. Default false (hybrid mode). Set true only when
|
|
you need exact-string matching (e.g. an error code, identifier).
|
|
|
|
Tips for complex queries:
|
|
- Consider expanding into 2-3 variant phrasings and calling this tool multiple
|
|
times, then deduplicating results by chunk_id. For example, search for both
|
|
"pension revaluation rules" and "how are pensions revalued" to cast a wider net.
|
|
- For precision, rerank the returned results using your own judgement based on
|
|
relevance to the original question.
|
|
- Call kb_status to see which embedding model is in use.
|
|
"""
|
|
result = engine.search(
|
|
query=query,
|
|
top=top,
|
|
tags=tags or None,
|
|
doc_type=doc_type,
|
|
fts_only=fts_only,
|
|
)
|
|
|
|
results_list = result if isinstance(result, list) else result.get("results", [])
|
|
return json.dumps(results_list, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_addnote(
|
|
text: str,
|
|
tags: list[str] | None = None,
|
|
title: str | None = None,
|
|
) -> str:
|
|
"""Add a text note to the knowledge base for indexing and search.
|
|
|
|
The note is queued for ingestion — it will be chunked, embedded, and made
|
|
searchable. Use kb_jobs to check ingestion status.
|
|
|
|
Args:
|
|
text: The note text content.
|
|
tags: Tags to apply to the note.
|
|
title: Optional title (auto-derived from first line if omitted).
|
|
"""
|
|
result = engine.add_note(text=text, tags=tags or None, title=title)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_update_note(
|
|
document_id: int,
|
|
text: str,
|
|
) -> str:
|
|
"""Update an existing note's content in place.
|
|
|
|
Replaces the note text, re-chunks, and re-embeds while preserving the
|
|
document ID, creation timestamp, and tags. Only works on documents with
|
|
doc_type "note".
|
|
|
|
Args:
|
|
document_id: The ID of the note document to update.
|
|
text: The new text content for the note.
|
|
"""
|
|
result = engine.update_note(document_id, text)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_get(
|
|
document_id: int | None = None,
|
|
source_path: str | None = None,
|
|
) -> str:
|
|
"""Retrieve document details from the knowledge base.
|
|
|
|
Look up a document by its ID or source path. Returns full document metadata,
|
|
tags, and chunk contents.
|
|
|
|
Args:
|
|
document_id: The numeric document ID.
|
|
source_path: The document's source path (alternative to document_id).
|
|
"""
|
|
if document_id is not None:
|
|
result = engine.get_document(document_id)
|
|
return json.dumps(result, indent=2)
|
|
elif source_path is not None:
|
|
docs = engine.list_documents()
|
|
matches = [d for d in docs if d.get("source_path") == source_path]
|
|
if not matches:
|
|
return json.dumps({"error": "No document found with that source_path"})
|
|
doc = engine.get_document(matches[0]["id"])
|
|
return json.dumps(doc, indent=2)
|
|
else:
|
|
return json.dumps({"error": "Provide either document_id or source_path"})
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_status() -> str:
|
|
"""Get knowledge base engine status.
|
|
|
|
Returns engine version, embedding model info, device info, document counts,
|
|
database size, and ingestion queue state.
|
|
"""
|
|
result = engine.get_status()
|
|
result["authenticated"] = bool(config.KB_MCP_API_KEY)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_jobs(
|
|
status: str | None = None,
|
|
) -> str:
|
|
"""List ingestion jobs and their status.
|
|
|
|
Returns recent jobs showing what has been queued, is processing, completed,
|
|
or failed.
|
|
|
|
Args:
|
|
status: Filter by job status ("queued", "processing", "done", "failed", "skipped").
|
|
"""
|
|
result = engine.list_jobs(status=status)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_delete(
|
|
document_id: int,
|
|
) -> str:
|
|
"""Permanently delete a document from the knowledge base.
|
|
|
|
Removes the document and all associated data (chunks, embeddings, tags,
|
|
stored files). This action cannot be undone.
|
|
|
|
Args:
|
|
document_id: The ID of the document to delete.
|
|
"""
|
|
result = engine.delete_document(document_id)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_upload_start(
|
|
filename: str,
|
|
total_size: int,
|
|
tags: list[str] | None = None,
|
|
) -> str:
|
|
"""Start a chunked file upload to the knowledge base.
|
|
|
|
Use this for uploading files from a remote agent. The upload process is:
|
|
1. Call kb_upload_start to get an upload_id
|
|
2. Call kb_upload_chunk repeatedly with base64-encoded file chunks (recommended ~1MB each)
|
|
3. Call kb_upload_finish to submit the file for ingestion
|
|
|
|
Example for a 3MB file:
|
|
upload = kb_upload_start(filename="report.pdf", total_size=3145728, tags=["project:x"])
|
|
kb_upload_chunk(upload_id=upload["upload_id"], data="<base64 chunk 0>", chunk_index=0)
|
|
kb_upload_chunk(upload_id=upload["upload_id"], data="<base64 chunk 1>", chunk_index=1)
|
|
kb_upload_chunk(upload_id=upload["upload_id"], data="<base64 chunk 2>", chunk_index=2)
|
|
result = kb_upload_finish(upload_id=upload["upload_id"])
|
|
|
|
Args:
|
|
filename: Original filename (used for type detection).
|
|
total_size: Total file size in bytes.
|
|
tags: Tags to apply to the uploaded document.
|
|
"""
|
|
upload_id = uploads.start_upload(filename, total_size, tags or [])
|
|
return json.dumps({"upload_id": upload_id})
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_upload_chunk(
|
|
upload_id: str,
|
|
data: str,
|
|
chunk_index: int,
|
|
) -> str:
|
|
"""Upload a base64-encoded chunk of a file.
|
|
|
|
Part of the chunked upload flow started by kb_upload_start.
|
|
|
|
Args:
|
|
upload_id: The upload ID from kb_upload_start.
|
|
data: Base64-encoded file data for this chunk.
|
|
chunk_index: Zero-based index of this chunk.
|
|
"""
|
|
try:
|
|
uploads.add_chunk(upload_id, data, chunk_index)
|
|
return json.dumps({"status": "ok", "chunk_index": chunk_index})
|
|
except KeyError as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_upload_finish(
|
|
upload_id: str,
|
|
) -> str:
|
|
"""Finish a chunked upload and submit the file for ingestion.
|
|
|
|
Reassembles all uploaded chunks and forwards the complete file to the
|
|
engine for processing. Returns the ingestion job ID.
|
|
|
|
Args:
|
|
upload_id: The upload ID from kb_upload_start.
|
|
"""
|
|
try:
|
|
filename, file_bytes, tags = uploads.finish_upload(upload_id)
|
|
result = engine.upload_file(filename, file_bytes, tags)
|
|
return json.dumps(result, indent=2)
|
|
except KeyError as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bulk operation tools
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_bulk_delete(
|
|
document_ids: list[int] | None = None,
|
|
tags: list[str] | None = None,
|
|
doc_type: str | None = None,
|
|
from_id: int | None = None,
|
|
to_id: int | None = None,
|
|
force: bool = False,
|
|
) -> str:
|
|
"""Permanently delete multiple documents matching a filter.
|
|
|
|
Removes matched documents and all associated data (chunks, embeddings, tags,
|
|
stored files). This action cannot be undone.
|
|
|
|
Selection filters combine with AND logic — at least one is required.
|
|
|
|
A safety threshold applies: if the operation would affect more than 70% of
|
|
all documents, it is rejected unless force=true.
|
|
|
|
Args:
|
|
document_ids: Delete documents with these specific IDs.
|
|
tags: Delete documents that have ALL of these tags (selection filter).
|
|
doc_type: Delete documents of this type (e.g. "note", "pdf").
|
|
from_id: Delete documents with id >= this value.
|
|
to_id: Delete documents with id <= this value.
|
|
force: Override the safety threshold if it would block the operation.
|
|
"""
|
|
result = engine.bulk_delete(
|
|
document_ids=document_ids, tags=tags, doc_type=doc_type,
|
|
from_id=from_id, to_id=to_id, force=force,
|
|
)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_bulk_tags(
|
|
document_ids: list[int] | None = None,
|
|
tags: list[str] | None = None,
|
|
doc_type: str | None = None,
|
|
from_id: int | None = None,
|
|
to_id: int | None = None,
|
|
add: list[str] | None = None,
|
|
remove: list[str] | None = None,
|
|
force: bool = False,
|
|
) -> str:
|
|
"""Add and/or remove tags on multiple documents matching a filter.
|
|
|
|
Selection filters combine with AND logic — at least one is required.
|
|
Note: the 'tags' parameter is a SELECTION FILTER (which documents to target),
|
|
while 'add' and 'remove' specify the TAG CHANGES to apply to those documents.
|
|
|
|
Args:
|
|
document_ids: Target documents with these specific IDs.
|
|
tags: Target documents that have ALL of these tags (selection filter).
|
|
doc_type: Target documents of this type.
|
|
from_id: Target documents with id >= this value.
|
|
to_id: Target documents with id <= this value.
|
|
add: Tags to add to matched documents.
|
|
remove: Tags to remove from matched documents.
|
|
force: Override the safety threshold if it would block the operation.
|
|
"""
|
|
result = engine.bulk_tags(
|
|
document_ids=document_ids, tags=tags, doc_type=doc_type,
|
|
from_id=from_id, to_id=to_id, add=add, remove=remove, force=force,
|
|
)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def kb_bulk_set_tags(
|
|
document_ids: list[int] | None = None,
|
|
tags: list[str] | None = None,
|
|
doc_type: str | None = None,
|
|
from_id: int | None = None,
|
|
to_id: int | None = None,
|
|
new_tags: list[str] | None = None,
|
|
force: bool = False,
|
|
) -> str:
|
|
"""Replace all tags on multiple documents with a new set.
|
|
|
|
Removes ALL existing tags from matched documents, then applies the new tag set.
|
|
Selection filters combine with AND logic — at least one is required.
|
|
Note: the 'tags' parameter is a SELECTION FILTER (which documents to target),
|
|
while 'new_tags' is the REPLACEMENT tag set to apply.
|
|
|
|
Args:
|
|
document_ids: Target documents with these specific IDs.
|
|
tags: Target documents that have ALL of these tags (selection filter).
|
|
doc_type: Target documents of this type.
|
|
from_id: Target documents with id >= this value.
|
|
to_id: Target documents with id <= this value.
|
|
new_tags: The replacement tag set to apply to all matched documents.
|
|
force: Override the safety threshold if it would block the operation.
|
|
"""
|
|
result = engine.bulk_set_tags(
|
|
document_ids=document_ids, tags=tags, doc_type=doc_type,
|
|
from_id=from_id, to_id=to_id, new_tags=new_tags, force=force,
|
|
)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth middleware
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class BearerAuthMiddleware(BaseHTTPMiddleware):
|
|
async def dispatch(self, request: Request, call_next):
|
|
if not config.KB_MCP_API_KEY:
|
|
return await call_next(request)
|
|
|
|
auth_header = request.headers.get("authorization", "")
|
|
if auth_header.startswith("Bearer ") and auth_header[7:] == config.KB_MCP_API_KEY:
|
|
return await call_next(request)
|
|
|
|
return JSONResponse(
|
|
status_code=401,
|
|
content={"error": "Unauthorized"},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ASGI app assembly
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_app():
|
|
"""Create the ASGI app with auth middleware wrapping the MCP server."""
|
|
from contextlib import asynccontextmanager
|
|
|
|
mcp_app = mcp.streamable_http_app()
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app):
|
|
uploads.start_cleanup_task()
|
|
logger.info("Upload cleanup task started")
|
|
# Delegate to the MCP app's lifespan if it has one
|
|
if hasattr(mcp_app, 'router') and hasattr(mcp_app.router, 'lifespan_context'):
|
|
async with mcp_app.router.lifespan_context(app):
|
|
yield
|
|
else:
|
|
yield
|
|
|
|
app = Starlette(
|
|
routes=[Mount("/", app=mcp_app)],
|
|
middleware=[Middleware(BearerAuthMiddleware)],
|
|
lifespan=lifespan,
|
|
)
|
|
return app
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
logger.info(
|
|
"Starting kb MCP server on port %d, engine=%s",
|
|
config.KB_MCP_PORT,
|
|
config.KB_ENGINE_URL,
|
|
)
|
|
|
|
app = create_app()
|
|
uvicorn.run(app, host="0.0.0.0", port=config.KB_MCP_PORT)
|