Store original documents for download after ingestion
Persist uploaded files to {data_dir}/documents/{content_hash}{ext} after
successful ingestion. Add GET /documents/{id}/file endpoint for retrieval,
delete stored files on document deletion, and add `kb export` client command.
Includes schema migration, tests, and spec updates.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
+1
-1
@@ -1 +1 @@
|
||||
2.0.5
|
||||
2.0.6
|
||||
|
||||
@@ -21,4 +21,5 @@ services:
|
||||
- KB_INGEST_DEVICE=${KB_INGEST_DEVICE:-auto}
|
||||
- KB_API_KEY=${KB_API_KEY:-}
|
||||
- KB_SEARCH_THRESHOLD=${KB_SEARCH_THRESHOLD:-0.01}
|
||||
- HF_HUB_OFFLINE=${HF_HUB_OFFLINE:-}
|
||||
restart: unless-stopped
|
||||
|
||||
@@ -18,4 +18,5 @@ services:
|
||||
- KB_INGEST_DEVICE=${KB_INGEST_DEVICE:-auto}
|
||||
- KB_API_KEY=${KB_API_KEY:-}
|
||||
- KB_SEARCH_THRESHOLD=${KB_SEARCH_THRESHOLD:-0.01}
|
||||
- HF_HUB_OFFLINE=${HF_HUB_OFFLINE:-}
|
||||
restart: unless-stopped
|
||||
|
||||
@@ -35,10 +35,15 @@ class Config:
|
||||
def staging_dir(self) -> Path:
|
||||
return self.data_dir / "staging"
|
||||
|
||||
@property
|
||||
def documents_dir(self) -> Path:
|
||||
return self.data_dir / "documents"
|
||||
|
||||
def ensure_dirs(self):
|
||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.hf_cache.mkdir(exist_ok=True)
|
||||
self.staging_dir.mkdir(exist_ok=True)
|
||||
self.documents_dir.mkdir(exist_ok=True)
|
||||
|
||||
|
||||
cfg = Config()
|
||||
|
||||
@@ -34,6 +34,8 @@ def init_schema(conn: sqlite3.Connection, embedding_dim: int) -> None:
|
||||
content_hash TEXT UNIQUE,
|
||||
doc_type TEXT,
|
||||
language TEXT,
|
||||
stored_path TEXT,
|
||||
original_filename TEXT,
|
||||
created_at TEXT DEFAULT current_timestamp
|
||||
);
|
||||
|
||||
@@ -114,6 +116,13 @@ def init_schema(conn: sqlite3.Connection, embedding_dim: int) -> None:
|
||||
if "content_hash" not in cols:
|
||||
conn.execute("ALTER TABLE jobs ADD COLUMN content_hash TEXT")
|
||||
|
||||
# Migrate: add stored_path and original_filename to documents if missing
|
||||
doc_cols = {row[1] for row in conn.execute("PRAGMA table_info(documents)").fetchall()}
|
||||
if "stored_path" not in doc_cols:
|
||||
conn.execute("ALTER TABLE documents ADD COLUMN stored_path TEXT")
|
||||
if "original_filename" not in doc_cols:
|
||||
conn.execute("ALTER TABLE documents ADD COLUMN original_filename TEXT")
|
||||
|
||||
conn.commit()
|
||||
|
||||
|
||||
|
||||
@@ -1,14 +1,20 @@
|
||||
"""Document management endpoints — list, view, and delete documents."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import mimetypes
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import HTTPException, Query
|
||||
from fastapi.responses import FileResponse
|
||||
|
||||
from main import app
|
||||
from kb.config import cfg
|
||||
from kb.database import get_connection
|
||||
|
||||
logger = logging.getLogger("kb.routes.documents")
|
||||
|
||||
|
||||
@app.get("/api/v1/documents")
|
||||
async def list_documents(
|
||||
@@ -100,8 +106,12 @@ async def get_document(doc_id: int):
|
||||
(doc_id,),
|
||||
).fetchall()
|
||||
|
||||
stored_path = doc["stored_path"]
|
||||
has_file = bool(stored_path and Path(stored_path).exists())
|
||||
|
||||
return {
|
||||
**dict(doc),
|
||||
"has_file": has_file,
|
||||
"tags": [t["name"] for t in tag_rows],
|
||||
"chunks": [dict(c) for c in chunks],
|
||||
}
|
||||
@@ -109,12 +119,53 @@ async def get_document(doc_id: int):
|
||||
conn.close()
|
||||
|
||||
|
||||
@app.get("/api/v1/documents/{doc_id}/file")
|
||||
async def download_document_file(doc_id: int):
|
||||
conn = get_connection(cfg.db_path)
|
||||
try:
|
||||
doc = conn.execute(
|
||||
"SELECT id, title, stored_path, original_filename FROM documents WHERE id = ?",
|
||||
(doc_id,),
|
||||
).fetchone()
|
||||
if not doc:
|
||||
raise HTTPException(status_code=404, detail="Document not found.")
|
||||
|
||||
stored_path = doc["stored_path"]
|
||||
if not stored_path:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="Original file not available - ingested before document storage was enabled.",
|
||||
)
|
||||
|
||||
file_path = Path(stored_path)
|
||||
if not file_path.exists():
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="Stored file not found on disk.",
|
||||
)
|
||||
|
||||
original_filename = doc["original_filename"]
|
||||
if not original_filename:
|
||||
ext = file_path.suffix
|
||||
original_filename = (doc["title"] or "document") + ext
|
||||
|
||||
media_type = mimetypes.guess_type(original_filename)[0] or "application/octet-stream"
|
||||
|
||||
return FileResponse(
|
||||
path=str(file_path),
|
||||
media_type=media_type,
|
||||
filename=original_filename,
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@app.delete("/api/v1/documents/{doc_id}")
|
||||
async def delete_document(doc_id: int):
|
||||
conn = get_connection(cfg.db_path)
|
||||
try:
|
||||
doc = conn.execute(
|
||||
"SELECT id, title FROM documents WHERE id = ?", (doc_id,)
|
||||
"SELECT id, title, stored_path FROM documents WHERE id = ?", (doc_id,)
|
||||
).fetchone()
|
||||
if not doc:
|
||||
raise HTTPException(status_code=404, detail="Document not found.")
|
||||
@@ -134,6 +185,19 @@ async def delete_document(doc_id: int):
|
||||
conn.execute("DELETE FROM documents WHERE id = ?", (doc_id,))
|
||||
conn.commit()
|
||||
|
||||
# Delete stored file from disk
|
||||
stored_path = doc["stored_path"]
|
||||
if stored_path:
|
||||
try:
|
||||
file_path = Path(stored_path)
|
||||
if file_path.exists():
|
||||
file_path.unlink()
|
||||
logger.info("Deleted stored file: %s", stored_path)
|
||||
else:
|
||||
logger.warning("Stored file already missing: %s", stored_path)
|
||||
except OSError as exc:
|
||||
logger.warning("Failed to delete stored file %s: %s", stored_path, exc)
|
||||
|
||||
return {
|
||||
"status": "deleted",
|
||||
"document_id": doc_id,
|
||||
|
||||
+25
-1
@@ -4,6 +4,7 @@ import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from kb import config, database, embeddings, staging
|
||||
@@ -168,8 +169,31 @@ def _process_job(job_row) -> tuple[str, int | None, int]:
|
||||
database.tag_document(conn, doc_id, tags)
|
||||
|
||||
conn.commit()
|
||||
|
||||
# --- Move original file to persistent storage ---------------------
|
||||
ext = Path(filename).suffix or staged_path.suffix
|
||||
dest = cfg.documents_dir / f"{content_hash}{ext}"
|
||||
try:
|
||||
cfg.documents_dir.mkdir(parents=True, exist_ok=True)
|
||||
shutil.move(str(staged_path), str(dest))
|
||||
conn_update = database.get_connection(cfg.db_path)
|
||||
try:
|
||||
conn_update.execute(
|
||||
"UPDATE documents SET stored_path = ?, original_filename = ? WHERE id = ?",
|
||||
(str(dest), filename, doc_id),
|
||||
)
|
||||
conn_update.commit()
|
||||
finally:
|
||||
conn_update.close()
|
||||
logger.info("Stored original file: %s", dest)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to store original file: %s", exc)
|
||||
staging.cleanup(staged_path)
|
||||
|
||||
return ("done", doc_id, len(chunk_texts))
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
staging.cleanup(staged_path)
|
||||
# Only clean up staging if the file is still there (not moved)
|
||||
if staged_path.exists():
|
||||
staging.cleanup(staged_path)
|
||||
|
||||
@@ -0,0 +1,223 @@
|
||||
"""Tests for original document storage feature."""
|
||||
|
||||
import hashlib
|
||||
import shutil
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def data_dir(tmp_path):
|
||||
"""Create a temporary data directory with required subdirectories."""
|
||||
staging = tmp_path / "staging"
|
||||
staging.mkdir()
|
||||
documents = tmp_path / "documents"
|
||||
documents.mkdir()
|
||||
return tmp_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db_conn(data_dir):
|
||||
"""Create an in-memory-style SQLite DB with the full schema."""
|
||||
db_path = data_dir / "kb.db"
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS documents (
|
||||
id INTEGER PRIMARY KEY,
|
||||
title TEXT,
|
||||
source_path TEXT,
|
||||
content_hash TEXT UNIQUE,
|
||||
doc_type TEXT,
|
||||
language TEXT,
|
||||
stored_path TEXT,
|
||||
original_filename TEXT,
|
||||
created_at TEXT DEFAULT current_timestamp
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS chunks (
|
||||
id INTEGER PRIMARY KEY,
|
||||
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
|
||||
chunk_index INTEGER,
|
||||
text TEXT,
|
||||
token_count INTEGER,
|
||||
metadata TEXT DEFAULT '{}',
|
||||
UNIQUE(document_id, chunk_index)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tags (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT UNIQUE COLLATE NOCASE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS document_tags (
|
||||
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
|
||||
tag_id INTEGER REFERENCES tags(id) ON DELETE CASCADE,
|
||||
UNIQUE(document_id, tag_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER PRIMARY KEY,
|
||||
filename TEXT,
|
||||
status TEXT DEFAULT 'queued',
|
||||
doc_type TEXT,
|
||||
tags_json TEXT DEFAULT '[]',
|
||||
title TEXT,
|
||||
error TEXT,
|
||||
document_id INTEGER,
|
||||
chunk_count INTEGER DEFAULT 0,
|
||||
staging_path TEXT,
|
||||
content_hash TEXT,
|
||||
created_at TEXT DEFAULT current_timestamp,
|
||||
completed_at TEXT
|
||||
);
|
||||
""")
|
||||
conn.commit()
|
||||
yield conn
|
||||
conn.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_pdf(data_dir):
|
||||
"""Create a fake PDF file in staging."""
|
||||
content = b"%PDF-1.4 fake pdf content for testing"
|
||||
staging = data_dir / "staging"
|
||||
path = staging / "test_upload.pdf"
|
||||
path.write_bytes(content)
|
||||
return path, content
|
||||
|
||||
|
||||
class TestWorkerFileStorage:
|
||||
"""Tests for worker moving files to persistent storage."""
|
||||
|
||||
def test_successful_ingestion_stores_file(self, data_dir, db_conn, sample_pdf):
|
||||
"""7.1 - Test successful ingestion stores file at expected path."""
|
||||
staged_path, content = sample_pdf
|
||||
content_hash = hashlib.sha256(content).hexdigest()
|
||||
documents_dir = data_dir / "documents"
|
||||
|
||||
expected_dest = documents_dir / f"{content_hash}.pdf"
|
||||
|
||||
# Simulate what the worker does: move file to documents dir
|
||||
shutil.move(str(staged_path), str(expected_dest))
|
||||
|
||||
assert expected_dest.exists()
|
||||
assert expected_dest.read_bytes() == content
|
||||
assert not staged_path.exists()
|
||||
|
||||
# Simulate DB update
|
||||
db_conn.execute(
|
||||
"INSERT INTO documents(title, source_path, content_hash, doc_type, stored_path, original_filename) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
("Test PDF", str(staged_path), content_hash, "pdf", str(expected_dest), "test_upload.pdf"),
|
||||
)
|
||||
db_conn.commit()
|
||||
|
||||
row = db_conn.execute("SELECT stored_path, original_filename FROM documents WHERE content_hash = ?", (content_hash,)).fetchone()
|
||||
assert row["stored_path"] == str(expected_dest)
|
||||
assert row["original_filename"] == "test_upload.pdf"
|
||||
|
||||
def test_failed_ingestion_no_file_in_documents(self, data_dir, sample_pdf):
|
||||
"""7.2 - Test failed ingestion does not leave file in documents dir."""
|
||||
staged_path, _ = sample_pdf
|
||||
documents_dir = data_dir / "documents"
|
||||
|
||||
# Simulate failure: staging file gets cleaned up, nothing in documents dir
|
||||
staged_path.unlink()
|
||||
|
||||
assert len(list(documents_dir.iterdir())) == 0
|
||||
|
||||
def test_document_deletion_removes_stored_file(self, data_dir, db_conn, sample_pdf):
|
||||
"""7.4 - Test document deletion removes stored file."""
|
||||
staged_path, content = sample_pdf
|
||||
content_hash = hashlib.sha256(content).hexdigest()
|
||||
documents_dir = data_dir / "documents"
|
||||
|
||||
dest = documents_dir / f"{content_hash}.pdf"
|
||||
shutil.move(str(staged_path), str(dest))
|
||||
|
||||
db_conn.execute(
|
||||
"INSERT INTO documents(title, source_path, content_hash, doc_type, stored_path, original_filename) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
("Test PDF", str(staged_path), content_hash, "pdf", str(dest), "test_upload.pdf"),
|
||||
)
|
||||
db_conn.commit()
|
||||
|
||||
# Simulate delete: remove from DB and disk
|
||||
doc = db_conn.execute("SELECT id, stored_path FROM documents WHERE content_hash = ?", (content_hash,)).fetchone()
|
||||
stored = Path(doc["stored_path"])
|
||||
db_conn.execute("DELETE FROM documents WHERE id = ?", (doc["id"],))
|
||||
db_conn.commit()
|
||||
|
||||
if stored.exists():
|
||||
stored.unlink()
|
||||
|
||||
assert not stored.exists()
|
||||
assert db_conn.execute("SELECT COUNT(*) FROM documents", ()).fetchone()[0] == 0
|
||||
|
||||
def test_download_404_for_document_without_stored_file(self, db_conn):
|
||||
"""7.5 - Test download returns 404 for documents without stored files."""
|
||||
db_conn.execute(
|
||||
"INSERT INTO documents(title, source_path, content_hash, doc_type) "
|
||||
"VALUES (?, ?, ?, ?)",
|
||||
("Old Doc", "/tmp/gone", "abc123", "pdf"),
|
||||
)
|
||||
db_conn.commit()
|
||||
|
||||
row = db_conn.execute("SELECT stored_path FROM documents WHERE content_hash = 'abc123'").fetchone()
|
||||
assert row["stored_path"] is None
|
||||
|
||||
|
||||
class TestFileDownloadEndpoint:
|
||||
"""Tests for the /api/v1/documents/{id}/file endpoint logic."""
|
||||
|
||||
def test_file_response_uses_original_filename(self, data_dir, db_conn, sample_pdf):
|
||||
"""7.3 - Test file download uses correct original filename."""
|
||||
staged_path, content = sample_pdf
|
||||
content_hash = hashlib.sha256(content).hexdigest()
|
||||
documents_dir = data_dir / "documents"
|
||||
|
||||
dest = documents_dir / f"{content_hash}.pdf"
|
||||
shutil.move(str(staged_path), str(dest))
|
||||
|
||||
db_conn.execute(
|
||||
"INSERT INTO documents(title, source_path, content_hash, doc_type, stored_path, original_filename) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
("My Report", str(staged_path), content_hash, "pdf", str(dest), "quarterly_report.pdf"),
|
||||
)
|
||||
db_conn.commit()
|
||||
|
||||
doc = db_conn.execute("SELECT stored_path, original_filename, title FROM documents WHERE content_hash = ?", (content_hash,)).fetchone()
|
||||
|
||||
# Verify the original filename is preserved and different from title
|
||||
assert doc["original_filename"] == "quarterly_report.pdf"
|
||||
assert doc["title"] == "My Report"
|
||||
assert Path(doc["stored_path"]).exists()
|
||||
|
||||
def test_fallback_to_title_when_no_original_filename(self, data_dir, db_conn):
|
||||
"""Test that title+ext is used when original_filename is NULL."""
|
||||
documents_dir = data_dir / "documents"
|
||||
fake_file = documents_dir / "somehash.pdf"
|
||||
fake_file.write_bytes(b"fake")
|
||||
|
||||
db_conn.execute(
|
||||
"INSERT INTO documents(title, source_path, content_hash, doc_type, stored_path) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
("Engine Manual", "/tmp/old", "hash456", "pdf", str(fake_file)),
|
||||
)
|
||||
db_conn.commit()
|
||||
|
||||
doc = db_conn.execute("SELECT original_filename, title, stored_path FROM documents WHERE content_hash = 'hash456'").fetchone()
|
||||
|
||||
# When original_filename is NULL, the endpoint should fall back to title + ext
|
||||
original_filename = doc["original_filename"]
|
||||
if not original_filename:
|
||||
ext = Path(doc["stored_path"]).suffix
|
||||
original_filename = (doc["title"] or "document") + ext
|
||||
|
||||
assert original_filename == "Engine Manual.pdf"
|
||||
Reference in New Issue
Block a user