Files
kb/engine/kb/routes/jobs.py
T
steve 6fec627503 Upload-time duplicate detection, FTS5 query sanitization, release guard
- Reject duplicate uploads at the API boundary (HTTP 409) instead of
  silently skipping in the background worker. Checks both ingested
  documents and in-flight jobs via content_hash on the jobs table.
- Go client handles 409 with distinct messages for already-imported
  documents vs already-queued jobs.
- Sanitize FTS5 search queries by quoting each token to prevent syntax
  errors from special characters like ?, *, ", (), AND, OR, NOT.
- Add try/except safety net around FTS5 execute for edge cases.
- Add main branch guard to release.sh to prevent releasing from
  feature branches.
- Update specs and README to reflect new behaviour.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 23:05:07 +00:00

83 lines
2.5 KiB
Python

"""Job management endpoints — submit files/notes for ingestion and track progress."""
import hashlib
import json
from typing import Optional
from fastapi import HTTPException, UploadFile, File, Form, Query
from fastapi.responses import JSONResponse
from main import app
from kb.config import cfg
from kb.database import get_connection, create_job, get_job, list_jobs, get_document_by_hash
from kb.staging import stage_file, stage_note
@app.post("/api/v1/jobs", status_code=202)
async def submit_job(
file: Optional[UploadFile] = File(None),
note: Optional[str] = Form(None),
title: Optional[str] = Form(None),
tags: Optional[str] = Form(None),
doc_type: Optional[str] = Form(None),
):
if not file and not note:
raise HTTPException(
status_code=422,
detail="Either 'file' or 'note' must be provided.",
)
if file:
content = await file.read()
content_hash = hashlib.sha256(content).hexdigest()
filename = file.filename
else:
content = note.encode("utf-8")
content_hash = hashlib.sha256(content).hexdigest()
filename = None
conn = get_connection(cfg.db_path)
try:
existing = get_document_by_hash(conn, content_hash)
if existing:
return JSONResponse(
status_code=409,
content={"error": "duplicate", **existing},
)
if file:
staging_path = stage_file(cfg.staging_dir, file.filename, content)
else:
staging_path = stage_note(cfg.staging_dir, title or "note", note)
filename = staging_path.name
tags_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else []
tags_json = json.dumps(tags_list)
job_id = create_job(conn, filename, str(staging_path), doc_type, tags_json, title, content_hash)
return {"job_id": job_id, "status": "queued", "filename": filename}
finally:
conn.close()
@app.get("/api/v1/jobs")
async def list_all_jobs(status: Optional[str] = Query(None)):
conn = get_connection(cfg.db_path)
try:
rows = list_jobs(conn, status)
return [dict(row) for row in rows]
finally:
conn.close()
@app.get("/api/v1/jobs/{job_id}")
async def get_single_job(job_id: int):
conn = get_connection(cfg.db_path)
try:
row = get_job(conn, job_id)
if not row:
raise HTTPException(status_code=404, detail="Job not found.")
return dict(row)
finally:
conn.close()