Files
kb/mcp/uploads.py
T
steve e7136a4a20 Add MCP server, note mutation endpoint, and updated_at tracking (v3.0.0)
New MCP server (mcp/) exposes kb operations as native MCP tools over
Streamable HTTP with Bearer token auth. Supports collections via tag
conventions, chunked file uploads, and agent-side search patterns.

Engine gains PATCH /api/v1/notes/{id} for in-place note updates with
transactional re-chunk/re-embed, and updated_at column on documents.

Go client adds updatenote command and Patch HTTP method.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 21:34:55 +01:00

97 lines
2.9 KiB
Python

"""Chunked upload staging management."""
import asyncio
import base64
import logging
import shutil
import tempfile
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
logger = logging.getLogger("kb.mcp.uploads")
UPLOAD_TIMEOUT_SECONDS = 600 # 10 minutes
@dataclass
class StagedUpload:
upload_id: str
filename: str
total_size: int
tags: list[str]
staging_dir: Path
created_at: float = field(default_factory=time.time)
chunks: dict[int, Path] = field(default_factory=dict)
_uploads: dict[str, StagedUpload] = {}
_cleanup_task: asyncio.Task | None = None
def start_upload(filename: str, total_size: int, tags: list[str]) -> str:
upload_id = str(uuid.uuid4())
staging_dir = Path(tempfile.mkdtemp(prefix=f"kb_upload_{upload_id[:8]}_"))
_uploads[upload_id] = StagedUpload(
upload_id=upload_id,
filename=filename,
total_size=total_size,
tags=tags,
staging_dir=staging_dir,
)
logger.info("Started upload %s for %s (%d bytes)", upload_id, filename, total_size)
return upload_id
def add_chunk(upload_id: str, data_b64: str, chunk_index: int) -> None:
upload = _uploads.get(upload_id)
if upload is None:
raise KeyError(f"Upload ID not found: {upload_id}")
chunk_bytes = base64.b64decode(data_b64)
chunk_path = upload.staging_dir / f"chunk_{chunk_index:06d}"
chunk_path.write_bytes(chunk_bytes)
upload.chunks[chunk_index] = chunk_path
logger.info("Added chunk %d to upload %s (%d bytes)", chunk_index, upload_id, len(chunk_bytes))
def finish_upload(upload_id: str) -> tuple[str, bytes, list[str]]:
"""Reassemble chunks and return (filename, file_bytes, tags)."""
upload = _uploads.get(upload_id)
if upload is None:
raise KeyError(f"Upload ID not found: {upload_id}")
try:
parts = []
for idx in sorted(upload.chunks.keys()):
parts.append(upload.chunks[idx].read_bytes())
file_bytes = b"".join(parts)
return upload.filename, file_bytes, upload.tags
finally:
_cleanup_upload(upload_id)
def _cleanup_upload(upload_id: str) -> None:
upload = _uploads.pop(upload_id, None)
if upload and upload.staging_dir.exists():
shutil.rmtree(upload.staging_dir, ignore_errors=True)
async def cleanup_abandoned_uploads() -> None:
"""Background task that removes uploads older than the timeout."""
while True:
await asyncio.sleep(60)
now = time.time()
expired = [
uid for uid, u in _uploads.items()
if now - u.created_at > UPLOAD_TIMEOUT_SECONDS
]
for uid in expired:
logger.warning("Cleaning up abandoned upload %s", uid)
_cleanup_upload(uid)
def start_cleanup_task() -> None:
global _cleanup_task
if _cleanup_task is None or _cleanup_task.done():
_cleanup_task = asyncio.create_task(cleanup_abandoned_uploads())